• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1<!--- TEST_NAME FlowGuideTest -->
2
3**Table of contents**
4
5<!--- TOC -->
6
7* [Asynchronous Flow](#asynchronous-flow)
8  * [Representing multiple values](#representing-multiple-values)
9    * [Sequences](#sequences)
10    * [Suspending functions](#suspending-functions)
11    * [Flows](#flows)
12  * [Flows are cold](#flows-are-cold)
13  * [Flow cancellation basics](#flow-cancellation-basics)
14  * [Flow builders](#flow-builders)
15  * [Intermediate flow operators](#intermediate-flow-operators)
16    * [Transform operator](#transform-operator)
17    * [Size-limiting operators](#size-limiting-operators)
18  * [Terminal flow operators](#terminal-flow-operators)
19  * [Flows are sequential](#flows-are-sequential)
20  * [Flow context](#flow-context)
21    * [Wrong emission withContext](#wrong-emission-withcontext)
22    * [flowOn operator](#flowon-operator)
23  * [Buffering](#buffering)
24    * [Conflation](#conflation)
25    * [Processing the latest value](#processing-the-latest-value)
26  * [Composing multiple flows](#composing-multiple-flows)
27    * [Zip](#zip)
28    * [Combine](#combine)
29  * [Flattening flows](#flattening-flows)
30    * [flatMapConcat](#flatmapconcat)
31    * [flatMapMerge](#flatmapmerge)
32    * [flatMapLatest](#flatmaplatest)
33  * [Flow exceptions](#flow-exceptions)
34    * [Collector try and catch](#collector-try-and-catch)
35    * [Everything is caught](#everything-is-caught)
36  * [Exception transparency](#exception-transparency)
37    * [Transparent catch](#transparent-catch)
38    * [Catching declaratively](#catching-declaratively)
39  * [Flow completion](#flow-completion)
40    * [Imperative finally block](#imperative-finally-block)
41    * [Declarative handling](#declarative-handling)
42    * [Successful completion](#successful-completion)
43  * [Imperative versus declarative](#imperative-versus-declarative)
44  * [Launching flow](#launching-flow)
45  * [Flow cancellation checks](#flow-cancellation-checks)
46    * [Making busy flow cancellable](#making-busy-flow-cancellable)
47  * [Flow and Reactive Streams](#flow-and-reactive-streams)
48
49<!--- END -->
50
51## Asynchronous Flow
52
53A suspending function asynchronously returns a single value, but how can we return
54multiple asynchronously computed values? This is where Kotlin Flows come in.
55
56### Representing multiple values
57
58Multiple values can be represented in Kotlin using [collections].
59For example, we can have a `simple` function that returns a [List]
60of three numbers and then print them all using [forEach]:
61
62<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
63
64```kotlin
65fun simple(): List<Int> = listOf(1, 2, 3)
66
67fun main() {
68    simple().forEach { value -> println(value) }
69}
70```
71
72</div>
73
74> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-01.kt).
75
76This code outputs:
77
78```text
791
802
813
82```
83
84<!--- TEST -->
85
86#### Sequences
87
88If we are computing the numbers with some CPU-consuming blocking code
89(each computation taking 100ms), then we can represent the numbers using a [Sequence]:
90
91<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
92
93```kotlin
94fun simple(): Sequence<Int> = sequence { // sequence builder
95    for (i in 1..3) {
96        Thread.sleep(100) // pretend we are computing it
97        yield(i) // yield next value
98    }
99}
100
101fun main() {
102    simple().forEach { value -> println(value) }
103}
104```
105
106</div>
107
108> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-02.kt).
109
110This code outputs the same numbers, but it waits 100ms before printing each one.
111
112<!--- TEST
1131
1142
1153
116-->
117
118#### Suspending functions
119
120However, this computation blocks the main thread that is running the code.
121When these values are computed by asynchronous code we can mark the `simple` function with a `suspend` modifier,
122so that it can perform its work without blocking and return the result as a list:
123
124<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
125
126```kotlin
127import kotlinx.coroutines.*
128
129//sampleStart
130suspend fun simple(): List<Int> {
131    delay(1000) // pretend we are doing something asynchronous here
132    return listOf(1, 2, 3)
133}
134
135fun main() = runBlocking<Unit> {
136    simple().forEach { value -> println(value) }
137}
138//sampleEnd
139```
140
141</div>
142
143> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-03.kt).
144
145This code prints the numbers after waiting for a second.
146
147<!--- TEST
1481
1492
1503
151-->
152
153#### Flows
154
155Using the `List<Int>` result type, means we can only return all the values at once. To represent
156the stream of values that are being asynchronously computed, we can use a [`Flow<Int>`][Flow] type just like we would use the `Sequence<Int>` type for synchronously computed values:
157
158<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
159
160```kotlin
161import kotlinx.coroutines.*
162import kotlinx.coroutines.flow.*
163
164//sampleStart
165fun simple(): Flow<Int> = flow { // flow builder
166    for (i in 1..3) {
167        delay(100) // pretend we are doing something useful here
168        emit(i) // emit next value
169    }
170}
171
172fun main() = runBlocking<Unit> {
173    // Launch a concurrent coroutine to check if the main thread is blocked
174    launch {
175        for (k in 1..3) {
176            println("I'm not blocked $k")
177            delay(100)
178        }
179    }
180    // Collect the flow
181    simple().collect { value -> println(value) }
182}
183//sampleEnd
184```
185
186</div>
187
188> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-04.kt).
189
190This code waits 100ms before printing each number without blocking the main thread. This is verified
191by printing "I'm not blocked" every 100ms from a separate coroutine that is running in the main thread:
192
193```text
194I'm not blocked 1
1951
196I'm not blocked 2
1972
198I'm not blocked 3
1993
200```
201
202<!--- TEST -->
203
204Notice the following differences in the code with the [Flow] from the earlier examples:
205
206* A builder function for [Flow] type is called [flow][_flow].
207* Code inside the `flow { ... }` builder block can suspend.
208* The `simple` function  is no longer marked with `suspend` modifier.
209* Values are _emitted_ from the flow using [emit][FlowCollector.emit] function.
210* Values are _collected_ from the flow using [collect][collect] function.
211
212> We can replace [delay] with `Thread.sleep` in the body of `simple`'s `flow { ... }` and see that the main
213thread is blocked in this case.
214
215### Flows are cold
216
217Flows are _cold_ streams similar to sequences &mdash; the code inside a [flow][_flow] builder does not
218run until the flow is collected. This becomes clear in the following example:
219
220<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
221
222```kotlin
223import kotlinx.coroutines.*
224import kotlinx.coroutines.flow.*
225
226//sampleStart
227fun simple(): Flow<Int> = flow {
228    println("Flow started")
229    for (i in 1..3) {
230        delay(100)
231        emit(i)
232    }
233}
234
235fun main() = runBlocking<Unit> {
236    println("Calling simple function...")
237    val flow = simple()
238    println("Calling collect...")
239    flow.collect { value -> println(value) }
240    println("Calling collect again...")
241    flow.collect { value -> println(value) }
242}
243//sampleEnd
244```
245
246</div>
247
248> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-05.kt).
249
250Which prints:
251
252```text
253Calling simple function...
254Calling collect...
255Flow started
2561
2572
2583
259Calling collect again...
260Flow started
2611
2622
2633
264```
265
266<!--- TEST -->
267
268This is a key reason the `simple` function (which returns a flow) is not marked with `suspend` modifier.
269By itself, `simple()` call returns quickly and does not wait for anything. The flow starts every time it is collected,
270that is why we see "Flow started" when we call `collect` again.
271
272### Flow cancellation basics
273
274Flow adheres to the general cooperative cancellation of coroutines. As usual, flow collection can be
275cancelled when the flow is suspended in a cancellable suspending function (like [delay]).
276The following example shows how the flow gets cancelled on a timeout when running in a [withTimeoutOrNull] block
277and stops executing its code:
278
279<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
280
281```kotlin
282import kotlinx.coroutines.*
283import kotlinx.coroutines.flow.*
284
285//sampleStart
286fun simple(): Flow<Int> = flow {
287    for (i in 1..3) {
288        delay(100)
289        println("Emitting $i")
290        emit(i)
291    }
292}
293
294fun main() = runBlocking<Unit> {
295    withTimeoutOrNull(250) { // Timeout after 250ms
296        simple().collect { value -> println(value) }
297    }
298    println("Done")
299}
300//sampleEnd
301```
302
303</div>
304
305> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-06.kt).
306
307Notice how only two numbers get emitted by the flow in the `simple` function, producing the following output:
308
309```text
310Emitting 1
3111
312Emitting 2
3132
314Done
315```
316
317<!--- TEST -->
318
319See [Flow cancellation checks](#flow-cancellation-checks) section for more details.
320
321### Flow builders
322
323The `flow { ... }` builder from the previous examples is the most basic one. There are other builders for
324easier declaration of flows:
325
326* [flowOf] builder that defines a flow emitting a fixed set of values.
327* Various collections and sequences can be converted to flows using `.asFlow()` extension functions.
328
329So, the example that prints the numbers from 1 to 3 from a flow can be written as:
330
331<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
332
333```kotlin
334import kotlinx.coroutines.*
335import kotlinx.coroutines.flow.*
336
337fun main() = runBlocking<Unit> {
338//sampleStart
339    // Convert an integer range to a flow
340    (1..3).asFlow().collect { value -> println(value) }
341//sampleEnd
342}
343```
344
345</div>
346
347> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-07.kt).
348
349<!--- TEST
3501
3512
3523
353-->
354
355### Intermediate flow operators
356
357Flows can be transformed with operators, just as you would with collections and sequences.
358Intermediate operators are applied to an upstream flow and return a downstream flow.
359These operators are cold, just like flows are. A call to such an operator is not
360a suspending function itself. It works quickly, returning the definition of a new transformed flow.
361
362The basic operators have familiar names like [map] and [filter].
363The important difference to sequences is that blocks of
364code inside these operators can call suspending functions.
365
366For example, a flow of incoming requests can be
367mapped to the results with the [map] operator, even when performing a request is a long-running
368operation that is implemented by a suspending function:
369
370<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
371
372```kotlin
373import kotlinx.coroutines.*
374import kotlinx.coroutines.flow.*
375
376//sampleStart
377suspend fun performRequest(request: Int): String {
378    delay(1000) // imitate long-running asynchronous work
379    return "response $request"
380}
381
382fun main() = runBlocking<Unit> {
383    (1..3).asFlow() // a flow of requests
384        .map { request -> performRequest(request) }
385        .collect { response -> println(response) }
386}
387//sampleEnd
388```
389
390</div>
391
392> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-08.kt).
393
394It produces the following three lines, each line appearing after each second:
395
396```text
397response 1
398response 2
399response 3
400```
401
402<!--- TEST -->
403
404#### Transform operator
405
406Among the flow transformation operators, the most general one is called [transform]. It can be used to imitate
407simple transformations like [map] and [filter], as well as implement more complex transformations.
408Using the `transform` operator, we can [emit][FlowCollector.emit] arbitrary values an arbitrary number of times.
409
410For example, using `transform` we can emit a string before performing a long-running asynchronous request
411and follow it with a response:
412
413<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
414
415```kotlin
416import kotlinx.coroutines.*
417import kotlinx.coroutines.flow.*
418
419suspend fun performRequest(request: Int): String {
420    delay(1000) // imitate long-running asynchronous work
421    return "response $request"
422}
423
424fun main() = runBlocking<Unit> {
425//sampleStart
426    (1..3).asFlow() // a flow of requests
427        .transform { request ->
428            emit("Making request $request")
429            emit(performRequest(request))
430        }
431        .collect { response -> println(response) }
432//sampleEnd
433}
434```
435
436</div>
437
438> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-09.kt).
439
440The output of this code is:
441
442```text
443Making request 1
444response 1
445Making request 2
446response 2
447Making request 3
448response 3
449```
450
451<!--- TEST -->
452
453#### Size-limiting operators
454
455Size-limiting intermediate operators like [take] cancel the execution of the flow when the corresponding limit
456is reached. Cancellation in coroutines is always performed by throwing an exception, so that all the resource-management
457functions (like `try { ... } finally { ... }` blocks) operate normally in case of cancellation:
458
459<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
460
461```kotlin
462import kotlinx.coroutines.*
463import kotlinx.coroutines.flow.*
464
465//sampleStart
466fun numbers(): Flow<Int> = flow {
467    try {
468        emit(1)
469        emit(2)
470        println("This line will not execute")
471        emit(3)
472    } finally {
473        println("Finally in numbers")
474    }
475}
476
477fun main() = runBlocking<Unit> {
478    numbers()
479        .take(2) // take only the first two
480        .collect { value -> println(value) }
481}
482//sampleEnd
483```
484
485</div>
486
487> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-10.kt).
488
489The output of this code clearly shows that the execution of the `flow { ... }` body in the `numbers()` function
490stopped after emitting the second number:
491
492```text
4931
4942
495Finally in numbers
496```
497
498<!--- TEST -->
499
500### Terminal flow operators
501
502Terminal operators on flows are _suspending functions_ that start a collection of the flow.
503The [collect] operator is the most basic one, but there are other terminal operators, which can make it easier:
504
505* Conversion to various collections like [toList] and [toSet].
506* Operators to get the [first] value and to ensure that a flow emits a [single] value.
507* Reducing a flow to a value with [reduce] and [fold].
508
509For example:
510
511<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
512
513```kotlin
514import kotlinx.coroutines.*
515import kotlinx.coroutines.flow.*
516
517fun main() = runBlocking<Unit> {
518//sampleStart
519    val sum = (1..5).asFlow()
520        .map { it * it } // squares of numbers from 1 to 5
521        .reduce { a, b -> a + b } // sum them (terminal operator)
522    println(sum)
523//sampleEnd
524}
525```
526
527</div>
528
529> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-11.kt).
530
531Prints a single number:
532
533```text
53455
535```
536
537<!--- TEST -->
538
539### Flows are sequential
540
541Each individual collection of a flow is performed sequentially unless special operators that operate
542on multiple flows are used. The collection works directly in the coroutine that calls a terminal operator.
543No new coroutines are launched by default.
544Each emitted value is processed by all the intermediate operators from
545upstream to downstream and is then delivered to the terminal operator after.
546
547See the following example that filters the even integers and maps them to strings:
548
549<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
550
551```kotlin
552import kotlinx.coroutines.*
553import kotlinx.coroutines.flow.*
554
555fun main() = runBlocking<Unit> {
556//sampleStart
557    (1..5).asFlow()
558        .filter {
559            println("Filter $it")
560            it % 2 == 0
561        }
562        .map {
563            println("Map $it")
564            "string $it"
565        }.collect {
566            println("Collect $it")
567        }
568//sampleEnd
569}
570```
571
572</div>
573
574> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-12.kt).
575
576Producing:
577
578```text
579Filter 1
580Filter 2
581Map 2
582Collect string 2
583Filter 3
584Filter 4
585Map 4
586Collect string 4
587Filter 5
588```
589
590<!--- TEST -->
591
592### Flow context
593
594Collection of a flow always happens in the context of the calling coroutine. For example, if there is
595a `simple` flow, then the following code runs in the context specified
596by the author of this code, regardless of the implementation details of the `simple` flow:
597
598<div class="sample" markdown="1" theme="idea" data-highlight-only>
599
600```kotlin
601withContext(context) {
602    simple().collect { value ->
603        println(value) // run in the specified context
604    }
605}
606```
607
608</div>
609
610<!--- CLEAR -->
611
612This property of a flow is called _context preservation_.
613
614So, by default, code in the `flow { ... }` builder runs in the context that is provided by a collector
615of the corresponding flow. For example, consider the implementation of a `simple` function that prints the thread
616it is called on and emits three numbers:
617
618<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
619
620```kotlin
621import kotlinx.coroutines.*
622import kotlinx.coroutines.flow.*
623
624fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
625
626//sampleStart
627fun simple(): Flow<Int> = flow {
628    log("Started simple flow")
629    for (i in 1..3) {
630        emit(i)
631    }
632}
633
634fun main() = runBlocking<Unit> {
635    simple().collect { value -> log("Collected $value") }
636}
637//sampleEnd
638```
639
640</div>
641
642> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-13.kt).
643
644Running this code produces:
645
646```text
647[main @coroutine#1] Started simple flow
648[main @coroutine#1] Collected 1
649[main @coroutine#1] Collected 2
650[main @coroutine#1] Collected 3
651```
652
653<!--- TEST FLEXIBLE_THREAD -->
654
655Since `simple().collect` is called from the main thread, the body of `simple`'s flow is also called in the main thread.
656This is the perfect default for fast-running or asynchronous code that does not care about the execution context and
657does not block the caller.
658
659#### Wrong emission withContext
660
661However, the long-running CPU-consuming code might need to be executed in the context of [Dispatchers.Default] and UI-updating
662code might need to be executed in the context of [Dispatchers.Main]. Usually, [withContext] is used
663to change the context in the code using Kotlin coroutines, but code in the `flow { ... }` builder has to honor the context
664preservation property and is not allowed to [emit][FlowCollector.emit] from a different context.
665
666Try running the following code:
667
668<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
669
670```kotlin
671import kotlinx.coroutines.*
672import kotlinx.coroutines.flow.*
673
674//sampleStart
675fun simple(): Flow<Int> = flow {
676    // The WRONG way to change context for CPU-consuming code in flow builder
677    kotlinx.coroutines.withContext(Dispatchers.Default) {
678        for (i in 1..3) {
679            Thread.sleep(100) // pretend we are computing it in CPU-consuming way
680            emit(i) // emit next value
681        }
682    }
683}
684
685fun main() = runBlocking<Unit> {
686    simple().collect { value -> println(value) }
687}
688//sampleEnd
689```
690
691</div>
692
693> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-14.kt).
694
695This code produces the following exception:
696
697```text
698Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
699		Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
700		but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default].
701		Please refer to 'flow' documentation or use 'flowOn' instead
702	at ...
703```
704
705<!--- TEST EXCEPTION -->
706
707#### flowOn operator
708
709The exception refers to the [flowOn] function that shall be used to change the context of the flow emission.
710The correct way to change the context of a flow is shown in the example below, which also prints the
711names of the corresponding threads to show how it all works:
712
713<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
714
715```kotlin
716import kotlinx.coroutines.*
717import kotlinx.coroutines.flow.*
718
719fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
720
721//sampleStart
722fun simple(): Flow<Int> = flow {
723    for (i in 1..3) {
724        Thread.sleep(100) // pretend we are computing it in CPU-consuming way
725        log("Emitting $i")
726        emit(i) // emit next value
727    }
728}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
729
730fun main() = runBlocking<Unit> {
731    simple().collect { value ->
732        log("Collected $value")
733    }
734}
735//sampleEnd
736```
737
738</div>
739
740> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-15.kt).
741
742Notice how `flow { ... }` works in the background thread, while collection happens in the main thread:
743
744<!--- TEST FLEXIBLE_THREAD
745[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
746[main @coroutine#1] Collected 1
747[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
748[main @coroutine#1] Collected 2
749[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
750[main @coroutine#1] Collected 3
751-->
752
753Another thing to observe here is that the [flowOn] operator has changed the default sequential nature of the flow.
754Now collection happens in one coroutine ("coroutine#1") and emission happens in another coroutine
755("coroutine#2") that is running in another thread concurrently with the collecting coroutine. The [flowOn] operator
756creates another coroutine for an upstream flow when it has to change the [CoroutineDispatcher] in its context.
757
758### Buffering
759
760Running different parts of a flow in different coroutines can be helpful from the standpoint of the overall time it takes
761to collect the flow, especially when long-running asynchronous operations are involved. For example, consider a case when
762the emission by a `simple` flow is slow, taking 100 ms to produce an element; and collector is also slow,
763taking 300 ms to process an element. Let's see how long it takes to collect such a flow with three numbers:
764
765<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
766
767```kotlin
768import kotlinx.coroutines.*
769import kotlinx.coroutines.flow.*
770import kotlin.system.*
771
772//sampleStart
773fun simple(): Flow<Int> = flow {
774    for (i in 1..3) {
775        delay(100) // pretend we are asynchronously waiting 100 ms
776        emit(i) // emit next value
777    }
778}
779
780fun main() = runBlocking<Unit> {
781    val time = measureTimeMillis {
782        simple().collect { value ->
783            delay(300) // pretend we are processing it for 300 ms
784            println(value)
785        }
786    }
787    println("Collected in $time ms")
788}
789//sampleEnd
790```
791
792</div>
793
794> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-16.kt).
795
796It produces something like this, with the whole collection taking around 1200 ms (three numbers, 400 ms for each):
797
798```text
7991
8002
8013
802Collected in 1220 ms
803```
804
805<!--- TEST ARBITRARY_TIME -->
806
807We can use a [buffer] operator on a flow to run emitting code of the `simple` flow concurrently with collecting code,
808as opposed to running them sequentially:
809
810<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
811
812```kotlin
813import kotlinx.coroutines.*
814import kotlinx.coroutines.flow.*
815import kotlin.system.*
816
817fun simple(): Flow<Int> = flow {
818    for (i in 1..3) {
819        delay(100) // pretend we are asynchronously waiting 100 ms
820        emit(i) // emit next value
821    }
822}
823
824fun main() = runBlocking<Unit> {
825//sampleStart
826    val time = measureTimeMillis {
827        simple()
828            .buffer() // buffer emissions, don't wait
829            .collect { value ->
830                delay(300) // pretend we are processing it for 300 ms
831                println(value)
832            }
833    }
834    println("Collected in $time ms")
835//sampleEnd
836}
837```
838
839</div>
840
841> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-17.kt).
842
843It produces the same numbers just faster, as we have effectively created a processing pipeline,
844having to only wait 100 ms for the first number and then spending only 300 ms to process
845each number. This way it takes around 1000 ms to run:
846
847```text
8481
8492
8503
851Collected in 1071 ms
852```
853
854<!--- TEST ARBITRARY_TIME -->
855
856> Note that the [flowOn] operator uses the same buffering mechanism when it has to change a [CoroutineDispatcher],
857but here we explicitly request buffering without changing the execution context.
858
859#### Conflation
860
861When a flow represents partial results of the operation or operation status updates, it may not be necessary
862to process each value, but instead, only most recent ones. In this case, the [conflate] operator can be used to skip
863intermediate values when a collector is too slow to process them. Building on the previous example:
864
865<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
866
867```kotlin
868import kotlinx.coroutines.*
869import kotlinx.coroutines.flow.*
870import kotlin.system.*
871
872fun simple(): Flow<Int> = flow {
873    for (i in 1..3) {
874        delay(100) // pretend we are asynchronously waiting 100 ms
875        emit(i) // emit next value
876    }
877}
878
879fun main() = runBlocking<Unit> {
880//sampleStart
881    val time = measureTimeMillis {
882        simple()
883            .conflate() // conflate emissions, don't process each one
884            .collect { value ->
885                delay(300) // pretend we are processing it for 300 ms
886                println(value)
887            }
888    }
889    println("Collected in $time ms")
890//sampleEnd
891}
892```
893
894</div>
895
896> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-18.kt).
897
898We see that while the first number was still being processed the second, and third were already produced, so
899the second one was _conflated_ and only the most recent (the third one) was delivered to the collector:
900
901```text
9021
9033
904Collected in 758 ms
905```
906
907<!--- TEST ARBITRARY_TIME -->
908
909#### Processing the latest value
910
911Conflation is one way to speed up processing when both the emitter and collector are slow. It does it by dropping emitted values.
912The other way is to cancel a slow collector and restart it every time a new value is emitted. There is
913a family of `xxxLatest` operators that perform the same essential logic of a `xxx` operator, but cancel the
914code in their block on a new value. Let's try changing [conflate] to [collectLatest] in the previous example:
915
916<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
917
918```kotlin
919import kotlinx.coroutines.*
920import kotlinx.coroutines.flow.*
921import kotlin.system.*
922
923fun simple(): Flow<Int> = flow {
924    for (i in 1..3) {
925        delay(100) // pretend we are asynchronously waiting 100 ms
926        emit(i) // emit next value
927    }
928}
929
930fun main() = runBlocking<Unit> {
931//sampleStart
932    val time = measureTimeMillis {
933        simple()
934            .collectLatest { value -> // cancel & restart on the latest value
935                println("Collecting $value")
936                delay(300) // pretend we are processing it for 300 ms
937                println("Done $value")
938            }
939    }
940    println("Collected in $time ms")
941//sampleEnd
942}
943```
944
945</div>
946
947> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-19.kt).
948
949Since the body of [collectLatest] takes 300 ms, but new values are emitted every 100 ms, we see that the block
950is run on every value, but completes only for the last value:
951
952```text
953Collecting 1
954Collecting 2
955Collecting 3
956Done 3
957Collected in 741 ms
958```
959
960<!--- TEST ARBITRARY_TIME -->
961
962### Composing multiple flows
963
964There are lots of ways to compose multiple flows.
965
966#### Zip
967
968Just like the [Sequence.zip] extension function in the Kotlin standard library,
969flows have a [zip] operator that combines the corresponding values of two flows:
970
971<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
972
973```kotlin
974import kotlinx.coroutines.*
975import kotlinx.coroutines.flow.*
976
977fun main() = runBlocking<Unit> {
978//sampleStart
979    val nums = (1..3).asFlow() // numbers 1..3
980    val strs = flowOf("one", "two", "three") // strings
981    nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
982        .collect { println(it) } // collect and print
983//sampleEnd
984}
985```
986
987</div>
988
989> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-20.kt).
990
991This example prints:
992
993```text
9941 -> one
9952 -> two
9963 -> three
997```
998
999<!--- TEST -->
1000
1001#### Combine
1002
1003When flow represents the most recent value of a variable or operation (see also the related
1004section on [conflation](#conflation)), it might be needed to perform a computation that depends on
1005the most recent values of the corresponding flows and to recompute it whenever any of the upstream
1006flows emit a value. The corresponding family of operators is called [combine].
1007
1008For example, if the numbers in the previous example update every 300ms, but strings update every 400 ms,
1009then zipping them using the [zip] operator will still produce the same result,
1010albeit results that are printed every 400 ms:
1011
1012> We use a [onEach] intermediate operator in this example to delay each element and make the code
1013that emits sample flows more declarative and shorter.
1014
1015<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1016
1017```kotlin
1018import kotlinx.coroutines.*
1019import kotlinx.coroutines.flow.*
1020
1021fun main() = runBlocking<Unit> {
1022//sampleStart
1023    val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
1024    val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
1025    val startTime = System.currentTimeMillis() // remember the start time
1026    nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
1027        .collect { value -> // collect and print
1028            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1029        }
1030//sampleEnd
1031}
1032```
1033
1034</div>
1035
1036> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-21.kt).
1037
1038<!--- TEST ARBITRARY_TIME
10391 -> one at 437 ms from start
10402 -> two at 837 ms from start
10413 -> three at 1243 ms from start
1042-->
1043
1044However, when using a [combine] operator here instead of a [zip]:
1045
1046<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1047
1048```kotlin
1049import kotlinx.coroutines.*
1050import kotlinx.coroutines.flow.*
1051
1052fun main() = runBlocking<Unit> {
1053//sampleStart
1054    val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
1055    val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
1056    val startTime = System.currentTimeMillis() // remember the start time
1057    nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
1058        .collect { value -> // collect and print
1059            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1060        }
1061//sampleEnd
1062}
1063```
1064
1065</div>
1066
1067> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-22.kt).
1068
1069We get quite a different output, where a line is printed at each emission from either `nums` or `strs` flows:
1070
1071```text
10721 -> one at 452 ms from start
10732 -> one at 651 ms from start
10742 -> two at 854 ms from start
10753 -> two at 952 ms from start
10763 -> three at 1256 ms from start
1077```
1078
1079<!--- TEST ARBITRARY_TIME -->
1080
1081### Flattening flows
1082
1083Flows represent asynchronously received sequences of values, so it is quite easy to get in a situation where
1084each value triggers a request for another sequence of values. For example, we can have the following
1085function that returns a flow of two strings 500 ms apart:
1086
1087<div class="sample" markdown="1" theme="idea" data-highlight-only>
1088
1089```kotlin
1090fun requestFlow(i: Int): Flow<String> = flow {
1091    emit("$i: First")
1092    delay(500) // wait 500 ms
1093    emit("$i: Second")
1094}
1095```
1096
1097</div>
1098
1099<!--- CLEAR -->
1100
1101Now if we have a flow of three integers and call `requestFlow` for each of them like this:
1102
1103<div class="sample" markdown="1" theme="idea" data-highlight-only>
1104
1105```kotlin
1106(1..3).asFlow().map { requestFlow(it) }
1107```
1108
1109</div>
1110
1111<!--- CLEAR -->
1112
1113Then we end up with a flow of flows (`Flow<Flow<String>>`) that needs to be _flattened_ into a single flow for
1114further processing. Collections and sequences have [flatten][Sequence.flatten] and [flatMap][Sequence.flatMap]
1115operators for this. However, due to the asynchronous nature of flows they call for different _modes_ of flattening,
1116as such, there is a family of flattening operators on flows.
1117
1118#### flatMapConcat
1119
1120Concatenating mode is implemented by [flatMapConcat] and [flattenConcat] operators. They are the most direct
1121analogues of the corresponding sequence operators. They wait for the inner flow to complete before
1122starting to collect the next one as the following example shows:
1123
1124<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1125
1126```kotlin
1127import kotlinx.coroutines.*
1128import kotlinx.coroutines.flow.*
1129
1130fun requestFlow(i: Int): Flow<String> = flow {
1131    emit("$i: First")
1132    delay(500) // wait 500 ms
1133    emit("$i: Second")
1134}
1135
1136fun main() = runBlocking<Unit> {
1137//sampleStart
1138    val startTime = System.currentTimeMillis() // remember the start time
1139    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
1140        .flatMapConcat { requestFlow(it) }
1141        .collect { value -> // collect and print
1142            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1143        }
1144//sampleEnd
1145}
1146```
1147
1148</div>
1149
1150> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-23.kt).
1151
1152The sequential nature of [flatMapConcat] is clearly seen in the output:
1153
1154```text
11551: First at 121 ms from start
11561: Second at 622 ms from start
11572: First at 727 ms from start
11582: Second at 1227 ms from start
11593: First at 1328 ms from start
11603: Second at 1829 ms from start
1161```
1162
1163<!--- TEST ARBITRARY_TIME -->
1164
1165#### flatMapMerge
1166
1167Another flattening mode is to concurrently collect all the incoming flows and merge their values into
1168a single flow so that values are emitted as soon as possible.
1169It is implemented by [flatMapMerge] and [flattenMerge] operators. They both accept an optional
1170`concurrency` parameter that limits the number of concurrent flows that are collected at the same time
1171(it is equal to [DEFAULT_CONCURRENCY] by default).
1172
1173<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1174
1175```kotlin
1176import kotlinx.coroutines.*
1177import kotlinx.coroutines.flow.*
1178
1179fun requestFlow(i: Int): Flow<String> = flow {
1180    emit("$i: First")
1181    delay(500) // wait 500 ms
1182    emit("$i: Second")
1183}
1184
1185fun main() = runBlocking<Unit> {
1186//sampleStart
1187    val startTime = System.currentTimeMillis() // remember the start time
1188    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
1189        .flatMapMerge { requestFlow(it) }
1190        .collect { value -> // collect and print
1191            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1192        }
1193//sampleEnd
1194}
1195```
1196
1197</div>
1198
1199> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-24.kt).
1200
1201The concurrent nature of [flatMapMerge] is obvious:
1202
1203```text
12041: First at 136 ms from start
12052: First at 231 ms from start
12063: First at 333 ms from start
12071: Second at 639 ms from start
12082: Second at 732 ms from start
12093: Second at 833 ms from start
1210```
1211
1212<!--- TEST ARBITRARY_TIME -->
1213
1214> Note that the [flatMapMerge] calls its block of code (`{ requestFlow(it) }` in this example) sequentially, but
1215collects the resulting flows concurrently, it is the equivalent of performing a sequential
1216`map { requestFlow(it) }` first and then calling [flattenMerge] on the result.
1217
1218#### flatMapLatest
1219
1220In a similar way to the [collectLatest] operator, that was shown in
1221["Processing the latest value"](#processing-the-latest-value) section, there is the corresponding "Latest"
1222flattening mode where a collection of the previous flow is cancelled as soon as new flow is emitted.
1223It is implemented by the [flatMapLatest] operator.
1224
1225<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1226
1227```kotlin
1228import kotlinx.coroutines.*
1229import kotlinx.coroutines.flow.*
1230
1231fun requestFlow(i: Int): Flow<String> = flow {
1232    emit("$i: First")
1233    delay(500) // wait 500 ms
1234    emit("$i: Second")
1235}
1236
1237fun main() = runBlocking<Unit> {
1238//sampleStart
1239    val startTime = System.currentTimeMillis() // remember the start time
1240    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
1241        .flatMapLatest { requestFlow(it) }
1242        .collect { value -> // collect and print
1243            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1244        }
1245//sampleEnd
1246}
1247```
1248
1249</div>
1250
1251> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-25.kt).
1252
1253The output here in this example is a good demonstration of how [flatMapLatest] works:
1254
1255```text
12561: First at 142 ms from start
12572: First at 322 ms from start
12583: First at 425 ms from start
12593: Second at 931 ms from start
1260```
1261
1262<!--- TEST ARBITRARY_TIME -->
1263
1264> Note that [flatMapLatest] cancels all the code in its block (`{ requestFlow(it) }` in this example) on a new value.
1265It makes no difference in this particular example, because the call to `requestFlow` itself is fast, not-suspending,
1266and cannot be cancelled. However, it would show up if we were to use suspending functions like `delay` in there.
1267
1268### Flow exceptions
1269
1270Flow collection can complete with an exception when an emitter or code inside the operators throw an exception.
1271There are several ways to handle these exceptions.
1272
1273#### Collector try and catch
1274
1275A collector can use Kotlin's [`try/catch`][exceptions] block to handle exceptions:
1276
1277<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1278
1279```kotlin
1280import kotlinx.coroutines.*
1281import kotlinx.coroutines.flow.*
1282
1283//sampleStart
1284fun simple(): Flow<Int> = flow {
1285    for (i in 1..3) {
1286        println("Emitting $i")
1287        emit(i) // emit next value
1288    }
1289}
1290
1291fun main() = runBlocking<Unit> {
1292    try {
1293        simple().collect { value ->
1294            println(value)
1295            check(value <= 1) { "Collected $value" }
1296        }
1297    } catch (e: Throwable) {
1298        println("Caught $e")
1299    }
1300}
1301//sampleEnd
1302```
1303
1304</div>
1305
1306> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-26.kt).
1307
1308This code successfully catches an exception in [collect] terminal operator and,
1309as we see, no more values are emitted after that:
1310
1311```text
1312Emitting 1
13131
1314Emitting 2
13152
1316Caught java.lang.IllegalStateException: Collected 2
1317```
1318
1319<!--- TEST -->
1320
1321#### Everything is caught
1322
1323The previous example actually catches any exception happening in the emitter or in any intermediate or terminal operators.
1324For example, let's change the code so that emitted values are [mapped][map] to strings,
1325but the corresponding code produces an exception:
1326
1327<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1328
1329```kotlin
1330import kotlinx.coroutines.*
1331import kotlinx.coroutines.flow.*
1332
1333//sampleStart
1334fun simple(): Flow<String> =
1335    flow {
1336        for (i in 1..3) {
1337            println("Emitting $i")
1338            emit(i) // emit next value
1339        }
1340    }
1341    .map { value ->
1342        check(value <= 1) { "Crashed on $value" }
1343        "string $value"
1344    }
1345
1346fun main() = runBlocking<Unit> {
1347    try {
1348        simple().collect { value -> println(value) }
1349    } catch (e: Throwable) {
1350        println("Caught $e")
1351    }
1352}
1353//sampleEnd
1354```
1355
1356</div>
1357
1358> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-27.kt).
1359
1360This exception is still caught and collection is stopped:
1361
1362```text
1363Emitting 1
1364string 1
1365Emitting 2
1366Caught java.lang.IllegalStateException: Crashed on 2
1367```
1368
1369<!--- TEST -->
1370
1371### Exception transparency
1372
1373But how can code of the emitter encapsulate its exception handling behavior?
1374
1375Flows must be _transparent to exceptions_ and it is a violation of the exception transparency to [emit][FlowCollector.emit] values in the
1376`flow { ... }` builder from inside of a `try/catch` block. This guarantees that a collector throwing an exception
1377can always catch it using `try/catch` as in the previous example.
1378
1379The emitter can use a [catch] operator that preserves this exception transparency and allows encapsulation
1380of its exception handling. The body of the `catch` operator can analyze an exception
1381and react to it in different ways depending on which exception was caught:
1382
1383* Exceptions can be rethrown using `throw`.
1384* Exceptions can be turned into emission of values using [emit][FlowCollector.emit] from the body of [catch].
1385* Exceptions can be ignored, logged, or processed by some other code.
1386
1387For example, let us emit the text on catching an exception:
1388
1389<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1390
1391```kotlin
1392import kotlinx.coroutines.*
1393import kotlinx.coroutines.flow.*
1394
1395fun simple(): Flow<String> =
1396    flow {
1397        for (i in 1..3) {
1398            println("Emitting $i")
1399            emit(i) // emit next value
1400        }
1401    }
1402    .map { value ->
1403        check(value <= 1) { "Crashed on $value" }
1404        "string $value"
1405    }
1406
1407fun main() = runBlocking<Unit> {
1408//sampleStart
1409    simple()
1410        .catch { e -> emit("Caught $e") } // emit on exception
1411        .collect { value -> println(value) }
1412//sampleEnd
1413}
1414```
1415
1416</div>
1417
1418> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-28.kt).
1419
1420The output of the example is the same, even though we do not have `try/catch` around the code anymore.
1421
1422<!--- TEST
1423Emitting 1
1424string 1
1425Emitting 2
1426Caught java.lang.IllegalStateException: Crashed on 2
1427-->
1428
1429#### Transparent catch
1430
1431The [catch] intermediate operator, honoring exception transparency, catches only upstream exceptions
1432(that is an exception from all the operators above `catch`, but not below it).
1433If the block in `collect { ... }` (placed below `catch`) throws an exception then it escapes:
1434
1435<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1436
1437```kotlin
1438import kotlinx.coroutines.*
1439import kotlinx.coroutines.flow.*
1440
1441//sampleStart
1442fun simple(): Flow<Int> = flow {
1443    for (i in 1..3) {
1444        println("Emitting $i")
1445        emit(i)
1446    }
1447}
1448
1449fun main() = runBlocking<Unit> {
1450    simple()
1451        .catch { e -> println("Caught $e") } // does not catch downstream exceptions
1452        .collect { value ->
1453            check(value <= 1) { "Collected $value" }
1454            println(value)
1455        }
1456}
1457//sampleEnd
1458```
1459
1460</div>
1461
1462> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-29.kt).
1463
1464A "Caught ..." message is not printed despite there being a `catch` operator:
1465
1466```text
1467Emitting 1
14681
1469Emitting 2
1470Exception in thread "main" java.lang.IllegalStateException: Collected 2
1471	at ...
1472```
1473
1474<!--- TEST EXCEPTION -->
1475
1476#### Catching declaratively
1477
1478We can combine the declarative nature of the [catch] operator with a desire to handle all the exceptions, by moving the body
1479of the [collect] operator into [onEach] and putting it before the `catch` operator. Collection of this flow must
1480be triggered by a call to `collect()` without parameters:
1481
1482<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1483
1484```kotlin
1485import kotlinx.coroutines.*
1486import kotlinx.coroutines.flow.*
1487
1488fun simple(): Flow<Int> = flow {
1489    for (i in 1..3) {
1490        println("Emitting $i")
1491        emit(i)
1492    }
1493}
1494
1495fun main() = runBlocking<Unit> {
1496//sampleStart
1497    simple()
1498        .onEach { value ->
1499            check(value <= 1) { "Collected $value" }
1500            println(value)
1501        }
1502        .catch { e -> println("Caught $e") }
1503        .collect()
1504//sampleEnd
1505}
1506```
1507
1508</div>
1509
1510> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-30.kt).
1511
1512Now we can see that a "Caught ..." message is printed and so we can catch all the exceptions without explicitly
1513using a `try/catch` block:
1514
1515```text
1516Emitting 1
15171
1518Emitting 2
1519Caught java.lang.IllegalStateException: Collected 2
1520```
1521
1522<!--- TEST EXCEPTION -->
1523
1524### Flow completion
1525
1526When flow collection completes (normally or exceptionally) it may need to execute an action.
1527As you may have already noticed, it can be done in two ways: imperative or declarative.
1528
1529#### Imperative finally block
1530
1531In addition to `try`/`catch`, a collector can also use a `finally` block to execute an action
1532upon `collect` completion.
1533
1534<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1535
1536```kotlin
1537import kotlinx.coroutines.*
1538import kotlinx.coroutines.flow.*
1539
1540//sampleStart
1541fun simple(): Flow<Int> = (1..3).asFlow()
1542
1543fun main() = runBlocking<Unit> {
1544    try {
1545        simple().collect { value -> println(value) }
1546    } finally {
1547        println("Done")
1548    }
1549}
1550//sampleEnd
1551```
1552
1553</div>
1554
1555> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-31.kt).
1556
1557This code prints three numbers produced by the `simple` flow followed by a "Done" string:
1558
1559```text
15601
15612
15623
1563Done
1564```
1565
1566<!--- TEST  -->
1567
1568#### Declarative handling
1569
1570For the declarative approach, flow has [onCompletion] intermediate operator that is invoked
1571when the flow has completely collected.
1572
1573The previous example can be rewritten using an [onCompletion] operator and produces the same output:
1574
1575<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1576
1577```kotlin
1578import kotlinx.coroutines.*
1579import kotlinx.coroutines.flow.*
1580
1581fun simple(): Flow<Int> = (1..3).asFlow()
1582
1583fun main() = runBlocking<Unit> {
1584//sampleStart
1585    simple()
1586        .onCompletion { println("Done") }
1587        .collect { value -> println(value) }
1588//sampleEnd
1589}
1590```
1591</div>
1592
1593> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-32.kt).
1594
1595<!--- TEST
15961
15972
15983
1599Done
1600-->
1601
1602The key advantage of [onCompletion] is a nullable `Throwable` parameter of the lambda that can be used
1603to determine whether the flow collection was completed normally or exceptionally. In the following
1604example the `simple` flow throws an exception after emitting the number 1:
1605
1606<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1607
1608```kotlin
1609import kotlinx.coroutines.*
1610import kotlinx.coroutines.flow.*
1611
1612//sampleStart
1613fun simple(): Flow<Int> = flow {
1614    emit(1)
1615    throw RuntimeException()
1616}
1617
1618fun main() = runBlocking<Unit> {
1619    simple()
1620        .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
1621        .catch { cause -> println("Caught exception") }
1622        .collect { value -> println(value) }
1623}
1624//sampleEnd
1625```
1626</div>
1627
1628> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-33.kt).
1629
1630As you may expect, it prints:
1631
1632```text
16331
1634Flow completed exceptionally
1635Caught exception
1636```
1637
1638<!--- TEST -->
1639
1640The [onCompletion] operator, unlike [catch], does not handle the exception. As we can see from the above
1641example code, the exception still flows downstream. It will be delivered to further `onCompletion` operators
1642and can be handled with a `catch` operator.
1643
1644#### Successful completion
1645
1646Another difference with [catch] operator is that [onCompletion] sees all exceptions and receives
1647a `null` exception only on successful completion of the upstream flow (without cancellation or failure).
1648
1649<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1650
1651```kotlin
1652import kotlinx.coroutines.*
1653import kotlinx.coroutines.flow.*
1654
1655//sampleStart
1656fun simple(): Flow<Int> = (1..3).asFlow()
1657
1658fun main() = runBlocking<Unit> {
1659    simple()
1660        .onCompletion { cause -> println("Flow completed with $cause") }
1661        .collect { value ->
1662            check(value <= 1) { "Collected $value" }
1663            println(value)
1664        }
1665}
1666//sampleEnd
1667```
1668
1669</div>
1670
1671> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-34.kt).
1672
1673We can see the completion cause is not null, because the flow was aborted due to downstream exception:
1674
1675```text
16761
1677Flow completed with java.lang.IllegalStateException: Collected 2
1678Exception in thread "main" java.lang.IllegalStateException: Collected 2
1679```
1680
1681<!--- TEST EXCEPTION -->
1682
1683### Imperative versus declarative
1684
1685Now we know how to collect flow, and handle its completion and exceptions in both imperative and declarative ways.
1686The natural question here is, which approach is preferred and why?
1687As a library, we do not advocate for any particular approach and believe that both options
1688are valid and should be selected according to your own preferences and code style.
1689
1690### Launching flow
1691
1692It is easy to use flows to represent asynchronous events that are coming from some source.
1693In this case, we need an analogue of the `addEventListener` function that registers a piece of code with a reaction
1694for incoming events and continues further work. The [onEach] operator can serve this role.
1695However, `onEach` is an intermediate operator. We also need a terminal operator to collect the flow.
1696Otherwise, just calling `onEach` has no effect.
1697
1698If we use the [collect] terminal operator after `onEach`, then the code after it will wait until the flow is collected:
1699
1700<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1701
1702```kotlin
1703import kotlinx.coroutines.*
1704import kotlinx.coroutines.flow.*
1705
1706//sampleStart
1707// Imitate a flow of events
1708fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
1709
1710fun main() = runBlocking<Unit> {
1711    events()
1712        .onEach { event -> println("Event: $event") }
1713        .collect() // <--- Collecting the flow waits
1714    println("Done")
1715}
1716//sampleEnd
1717```
1718
1719</div>
1720
1721> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-35.kt).
1722
1723As you can see, it prints:
1724
1725```text
1726Event: 1
1727Event: 2
1728Event: 3
1729Done
1730```
1731
1732<!--- TEST -->
1733
1734The [launchIn] terminal operator comes in handy here. By replacing `collect` with `launchIn` we can
1735launch a collection of the flow in a separate coroutine, so that execution of further code
1736immediately continues:
1737
1738<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1739
1740```kotlin
1741import kotlinx.coroutines.*
1742import kotlinx.coroutines.flow.*
1743
1744// Imitate a flow of events
1745fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
1746
1747//sampleStart
1748fun main() = runBlocking<Unit> {
1749    events()
1750        .onEach { event -> println("Event: $event") }
1751        .launchIn(this) // <--- Launching the flow in a separate coroutine
1752    println("Done")
1753}
1754//sampleEnd
1755```
1756
1757</div>
1758
1759> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-36.kt).
1760
1761It prints:
1762
1763```text
1764Done
1765Event: 1
1766Event: 2
1767Event: 3
1768```
1769
1770<!--- TEST -->
1771
1772The required parameter to `launchIn` must specify a [CoroutineScope] in which the coroutine to collect the flow is
1773launched. In the above example this scope comes from the [runBlocking]
1774coroutine builder, so while the flow is running, this [runBlocking] scope waits for completion of its child coroutine
1775and keeps the main function from returning and terminating this example.
1776
1777In actual applications a scope will come from an entity with a limited
1778lifetime. As soon as the lifetime of this entity is terminated the corresponding scope is cancelled, cancelling
1779the collection of the corresponding flow. This way the pair of `onEach { ... }.launchIn(scope)` works
1780like the `addEventListener`. However, there is no need for the corresponding `removeEventListener` function,
1781as cancellation and structured concurrency serve this purpose.
1782
1783Note that [launchIn] also returns a [Job], which can be used to [cancel][Job.cancel] the corresponding flow collection
1784coroutine only without cancelling the whole scope or to [join][Job.join] it.
1785
1786### Flow cancellation checks
1787
1788For convenience, the [flow][_flow] builder performs additional [ensureActive] checks for cancellation on each emitted value.
1789It means that a busy loop emitting from a `flow { ... }` is cancellable:
1790
1791<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1792
1793```kotlin
1794import kotlinx.coroutines.*
1795import kotlinx.coroutines.flow.*
1796
1797//sampleStart
1798fun foo(): Flow<Int> = flow {
1799    for (i in 1..5) {
1800        println("Emitting $i")
1801        emit(i)
1802    }
1803}
1804
1805fun main() = runBlocking<Unit> {
1806    foo().collect { value ->
1807        if (value == 3) cancel()
1808        println(value)
1809    }
1810}
1811//sampleEnd
1812```
1813
1814</div>
1815
1816> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-37.kt).
1817
1818We get only numbers up to 3 and a [CancellationException] after trying to emit number 4:
1819
1820```text
1821Emitting 1
18221
1823Emitting 2
18242
1825Emitting 3
18263
1827Emitting 4
1828Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c
1829```
1830
1831<!--- TEST EXCEPTION -->
1832
1833However, most other flow operators do not do additional cancellation checks on their own for performance reasons.
1834For example, if you use [IntRange.asFlow] extension to write the same busy loop and don't suspend anywhere,
1835then there are no checks for cancellation:
1836
1837<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1838
1839```kotlin
1840import kotlinx.coroutines.*
1841import kotlinx.coroutines.flow.*
1842
1843//sampleStart
1844fun main() = runBlocking<Unit> {
1845    (1..5).asFlow().collect { value ->
1846        if (value == 3) cancel()
1847        println(value)
1848    }
1849}
1850//sampleEnd
1851```
1852
1853</div>
1854
1855> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-38.kt).
1856
1857All numbers from 1 to 5 are collected and cancellation gets detected only before return from `runBlocking`:
1858
1859```text
18601
18612
18623
18634
18645
1865Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@3327bd23
1866```
1867
1868<!--- TEST EXCEPTION -->
1869
1870#### Making busy flow cancellable
1871
1872In the case where you have a busy loop with coroutines you must explicitly check for cancellation.
1873You can add `.onEach { currentCoroutineContext().ensureActive() }`, but there is a ready-to-use
1874[cancellable] operator provided to do that:
1875
1876<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1877
1878```kotlin
1879import kotlinx.coroutines.*
1880import kotlinx.coroutines.flow.*
1881
1882//sampleStart
1883fun main() = runBlocking<Unit> {
1884    (1..5).asFlow().cancellable().collect { value ->
1885        if (value == 3) cancel()
1886        println(value)
1887    }
1888}
1889//sampleEnd
1890```
1891
1892</div>
1893
1894> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-39.kt).
1895
1896With the `cancellable` operator only the numbers from 1 to 3 are collected:
1897
1898```text
18991
19002
19013
1902Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365
1903```
1904
1905<!--- TEST EXCEPTION -->
1906
1907### Flow and Reactive Streams
1908
1909For those who are familiar with [Reactive Streams](https://www.reactive-streams.org/) or reactive frameworks such as RxJava and project Reactor,
1910design of the Flow may look very familiar.
1911
1912Indeed, its design was inspired by Reactive Streams and its various implementations. But Flow main goal is to have as simple design as possible,
1913be Kotlin and suspension friendly and respect structured concurrency. Achieving this goal would be impossible without reactive pioneers and their tremendous work. You can read the complete story in [Reactive Streams and Kotlin Flows](https://medium.com/@elizarov/reactive-streams-and-kotlin-flows-bfd12772cda4) article.
1914
1915While being different, conceptually, Flow *is* a reactive stream and it is possible to convert it to the reactive (spec and TCK compliant) Publisher and vice versa.
1916Such converters are provided by `kotlinx.coroutines` out-of-the-box and can be found in corresponding reactive modules (`kotlinx-coroutines-reactive` for Reactive Streams, `kotlinx-coroutines-reactor` for Project Reactor and `kotlinx-coroutines-rx2`/`kotlinx-coroutines-rx3` for RxJava2/RxJava3).
1917Integration modules include conversions from and to `Flow`, integration with Reactor's `Context` and suspension-friendly ways to work with various reactive entities.
1918
1919<!-- stdlib references -->
1920
1921[collections]: https://kotlinlang.org/docs/reference/collections-overview.html
1922[List]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/-list/index.html
1923[forEach]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/for-each.html
1924[Sequence]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/index.html
1925[Sequence.zip]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/zip.html
1926[Sequence.flatten]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/flatten.html
1927[Sequence.flatMap]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/flat-map.html
1928[exceptions]: https://kotlinlang.org/docs/reference/exceptions.html
1929
1930<!--- MODULE kotlinx-coroutines-core -->
1931<!--- INDEX kotlinx.coroutines -->
1932[delay]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/delay.html
1933[withTimeoutOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-timeout-or-null.html
1934[Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
1935[Dispatchers.Main]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-main.html
1936[withContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-context.html
1937[CoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-dispatcher/index.html
1938[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html
1939[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
1940[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/index.html
1941[Job.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/cancel.html
1942[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/join.html
1943[ensureActive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/ensure-active.html
1944[CancellationException]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-cancellation-exception/index.html
1945<!--- INDEX kotlinx.coroutines.flow -->
1946[Flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html
1947[_flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow.html
1948[FlowCollector.emit]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow-collector/emit.html
1949[collect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect.html
1950[flowOf]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-of.html
1951[map]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/map.html
1952[filter]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/filter.html
1953[transform]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/transform.html
1954[take]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/take.html
1955[toList]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-list.html
1956[toSet]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-set.html
1957[first]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/first.html
1958[single]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/single.html
1959[reduce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/reduce.html
1960[fold]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/fold.html
1961[flowOn]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-on.html
1962[buffer]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/buffer.html
1963[conflate]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/conflate.html
1964[collectLatest]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect-latest.html
1965[zip]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/zip.html
1966[combine]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/combine.html
1967[onEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-each.html
1968[flatMapConcat]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-concat.html
1969[flattenConcat]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flatten-concat.html
1970[flatMapMerge]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-merge.html
1971[flattenMerge]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flatten-merge.html
1972[DEFAULT_CONCURRENCY]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-d-e-f-a-u-l-t_-c-o-n-c-u-r-r-e-n-c-y.html
1973[flatMapLatest]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-latest.html
1974[catch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/catch.html
1975[onCompletion]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-completion.html
1976[launchIn]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/launch-in.html
1977[IntRange.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/kotlin.ranges.-int-range/as-flow.html
1978[cancellable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/cancellable.html
1979<!--- END -->
1980