1<!--- TEST_NAME FlowGuideTest --> 2 3**Table of contents** 4 5<!--- TOC --> 6 7* [Asynchronous Flow](#asynchronous-flow) 8 * [Representing multiple values](#representing-multiple-values) 9 * [Sequences](#sequences) 10 * [Suspending functions](#suspending-functions) 11 * [Flows](#flows) 12 * [Flows are cold](#flows-are-cold) 13 * [Flow cancellation basics](#flow-cancellation-basics) 14 * [Flow builders](#flow-builders) 15 * [Intermediate flow operators](#intermediate-flow-operators) 16 * [Transform operator](#transform-operator) 17 * [Size-limiting operators](#size-limiting-operators) 18 * [Terminal flow operators](#terminal-flow-operators) 19 * [Flows are sequential](#flows-are-sequential) 20 * [Flow context](#flow-context) 21 * [Wrong emission withContext](#wrong-emission-withcontext) 22 * [flowOn operator](#flowon-operator) 23 * [Buffering](#buffering) 24 * [Conflation](#conflation) 25 * [Processing the latest value](#processing-the-latest-value) 26 * [Composing multiple flows](#composing-multiple-flows) 27 * [Zip](#zip) 28 * [Combine](#combine) 29 * [Flattening flows](#flattening-flows) 30 * [flatMapConcat](#flatmapconcat) 31 * [flatMapMerge](#flatmapmerge) 32 * [flatMapLatest](#flatmaplatest) 33 * [Flow exceptions](#flow-exceptions) 34 * [Collector try and catch](#collector-try-and-catch) 35 * [Everything is caught](#everything-is-caught) 36 * [Exception transparency](#exception-transparency) 37 * [Transparent catch](#transparent-catch) 38 * [Catching declaratively](#catching-declaratively) 39 * [Flow completion](#flow-completion) 40 * [Imperative finally block](#imperative-finally-block) 41 * [Declarative handling](#declarative-handling) 42 * [Successful completion](#successful-completion) 43 * [Imperative versus declarative](#imperative-versus-declarative) 44 * [Launching flow](#launching-flow) 45 * [Flow cancellation checks](#flow-cancellation-checks) 46 * [Making busy flow cancellable](#making-busy-flow-cancellable) 47 * [Flow and Reactive Streams](#flow-and-reactive-streams) 48 49<!--- END --> 50 51## Asynchronous Flow 52 53A suspending function asynchronously returns a single value, but how can we return 54multiple asynchronously computed values? This is where Kotlin Flows come in. 55 56### Representing multiple values 57 58Multiple values can be represented in Kotlin using [collections]. 59For example, we can have a `simple` function that returns a [List] 60of three numbers and then print them all using [forEach]: 61 62<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 63 64```kotlin 65fun simple(): List<Int> = listOf(1, 2, 3) 66 67fun main() { 68 simple().forEach { value -> println(value) } 69} 70``` 71 72</div> 73 74> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-01.kt). 75 76This code outputs: 77 78```text 791 802 813 82``` 83 84<!--- TEST --> 85 86#### Sequences 87 88If we are computing the numbers with some CPU-consuming blocking code 89(each computation taking 100ms), then we can represent the numbers using a [Sequence]: 90 91<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 92 93```kotlin 94fun simple(): Sequence<Int> = sequence { // sequence builder 95 for (i in 1..3) { 96 Thread.sleep(100) // pretend we are computing it 97 yield(i) // yield next value 98 } 99} 100 101fun main() { 102 simple().forEach { value -> println(value) } 103} 104``` 105 106</div> 107 108> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-02.kt). 109 110This code outputs the same numbers, but it waits 100ms before printing each one. 111 112<!--- TEST 1131 1142 1153 116--> 117 118#### Suspending functions 119 120However, this computation blocks the main thread that is running the code. 121When these values are computed by asynchronous code we can mark the `simple` function with a `suspend` modifier, 122so that it can perform its work without blocking and return the result as a list: 123 124<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 125 126```kotlin 127import kotlinx.coroutines.* 128 129//sampleStart 130suspend fun simple(): List<Int> { 131 delay(1000) // pretend we are doing something asynchronous here 132 return listOf(1, 2, 3) 133} 134 135fun main() = runBlocking<Unit> { 136 simple().forEach { value -> println(value) } 137} 138//sampleEnd 139``` 140 141</div> 142 143> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-03.kt). 144 145This code prints the numbers after waiting for a second. 146 147<!--- TEST 1481 1492 1503 151--> 152 153#### Flows 154 155Using the `List<Int>` result type, means we can only return all the values at once. To represent 156the stream of values that are being asynchronously computed, we can use a [`Flow<Int>`][Flow] type just like we would use the `Sequence<Int>` type for synchronously computed values: 157 158<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 159 160```kotlin 161import kotlinx.coroutines.* 162import kotlinx.coroutines.flow.* 163 164//sampleStart 165fun simple(): Flow<Int> = flow { // flow builder 166 for (i in 1..3) { 167 delay(100) // pretend we are doing something useful here 168 emit(i) // emit next value 169 } 170} 171 172fun main() = runBlocking<Unit> { 173 // Launch a concurrent coroutine to check if the main thread is blocked 174 launch { 175 for (k in 1..3) { 176 println("I'm not blocked $k") 177 delay(100) 178 } 179 } 180 // Collect the flow 181 simple().collect { value -> println(value) } 182} 183//sampleEnd 184``` 185 186</div> 187 188> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-04.kt). 189 190This code waits 100ms before printing each number without blocking the main thread. This is verified 191by printing "I'm not blocked" every 100ms from a separate coroutine that is running in the main thread: 192 193```text 194I'm not blocked 1 1951 196I'm not blocked 2 1972 198I'm not blocked 3 1993 200``` 201 202<!--- TEST --> 203 204Notice the following differences in the code with the [Flow] from the earlier examples: 205 206* A builder function for [Flow] type is called [flow][_flow]. 207* Code inside the `flow { ... }` builder block can suspend. 208* The `simple` function is no longer marked with `suspend` modifier. 209* Values are _emitted_ from the flow using [emit][FlowCollector.emit] function. 210* Values are _collected_ from the flow using [collect][collect] function. 211 212> We can replace [delay] with `Thread.sleep` in the body of `simple`'s `flow { ... }` and see that the main 213thread is blocked in this case. 214 215### Flows are cold 216 217Flows are _cold_ streams similar to sequences — the code inside a [flow][_flow] builder does not 218run until the flow is collected. This becomes clear in the following example: 219 220<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 221 222```kotlin 223import kotlinx.coroutines.* 224import kotlinx.coroutines.flow.* 225 226//sampleStart 227fun simple(): Flow<Int> = flow { 228 println("Flow started") 229 for (i in 1..3) { 230 delay(100) 231 emit(i) 232 } 233} 234 235fun main() = runBlocking<Unit> { 236 println("Calling simple function...") 237 val flow = simple() 238 println("Calling collect...") 239 flow.collect { value -> println(value) } 240 println("Calling collect again...") 241 flow.collect { value -> println(value) } 242} 243//sampleEnd 244``` 245 246</div> 247 248> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-05.kt). 249 250Which prints: 251 252```text 253Calling simple function... 254Calling collect... 255Flow started 2561 2572 2583 259Calling collect again... 260Flow started 2611 2622 2633 264``` 265 266<!--- TEST --> 267 268This is a key reason the `simple` function (which returns a flow) is not marked with `suspend` modifier. 269By itself, `simple()` call returns quickly and does not wait for anything. The flow starts every time it is collected, 270that is why we see "Flow started" when we call `collect` again. 271 272### Flow cancellation basics 273 274Flow adheres to the general cooperative cancellation of coroutines. As usual, flow collection can be 275cancelled when the flow is suspended in a cancellable suspending function (like [delay]). 276The following example shows how the flow gets cancelled on a timeout when running in a [withTimeoutOrNull] block 277and stops executing its code: 278 279<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 280 281```kotlin 282import kotlinx.coroutines.* 283import kotlinx.coroutines.flow.* 284 285//sampleStart 286fun simple(): Flow<Int> = flow { 287 for (i in 1..3) { 288 delay(100) 289 println("Emitting $i") 290 emit(i) 291 } 292} 293 294fun main() = runBlocking<Unit> { 295 withTimeoutOrNull(250) { // Timeout after 250ms 296 simple().collect { value -> println(value) } 297 } 298 println("Done") 299} 300//sampleEnd 301``` 302 303</div> 304 305> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-06.kt). 306 307Notice how only two numbers get emitted by the flow in the `simple` function, producing the following output: 308 309```text 310Emitting 1 3111 312Emitting 2 3132 314Done 315``` 316 317<!--- TEST --> 318 319See [Flow cancellation checks](#flow-cancellation-checks) section for more details. 320 321### Flow builders 322 323The `flow { ... }` builder from the previous examples is the most basic one. There are other builders for 324easier declaration of flows: 325 326* [flowOf] builder that defines a flow emitting a fixed set of values. 327* Various collections and sequences can be converted to flows using `.asFlow()` extension functions. 328 329So, the example that prints the numbers from 1 to 3 from a flow can be written as: 330 331<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 332 333```kotlin 334import kotlinx.coroutines.* 335import kotlinx.coroutines.flow.* 336 337fun main() = runBlocking<Unit> { 338//sampleStart 339 // Convert an integer range to a flow 340 (1..3).asFlow().collect { value -> println(value) } 341//sampleEnd 342} 343``` 344 345</div> 346 347> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-07.kt). 348 349<!--- TEST 3501 3512 3523 353--> 354 355### Intermediate flow operators 356 357Flows can be transformed with operators, just as you would with collections and sequences. 358Intermediate operators are applied to an upstream flow and return a downstream flow. 359These operators are cold, just like flows are. A call to such an operator is not 360a suspending function itself. It works quickly, returning the definition of a new transformed flow. 361 362The basic operators have familiar names like [map] and [filter]. 363The important difference to sequences is that blocks of 364code inside these operators can call suspending functions. 365 366For example, a flow of incoming requests can be 367mapped to the results with the [map] operator, even when performing a request is a long-running 368operation that is implemented by a suspending function: 369 370<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 371 372```kotlin 373import kotlinx.coroutines.* 374import kotlinx.coroutines.flow.* 375 376//sampleStart 377suspend fun performRequest(request: Int): String { 378 delay(1000) // imitate long-running asynchronous work 379 return "response $request" 380} 381 382fun main() = runBlocking<Unit> { 383 (1..3).asFlow() // a flow of requests 384 .map { request -> performRequest(request) } 385 .collect { response -> println(response) } 386} 387//sampleEnd 388``` 389 390</div> 391 392> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-08.kt). 393 394It produces the following three lines, each line appearing after each second: 395 396```text 397response 1 398response 2 399response 3 400``` 401 402<!--- TEST --> 403 404#### Transform operator 405 406Among the flow transformation operators, the most general one is called [transform]. It can be used to imitate 407simple transformations like [map] and [filter], as well as implement more complex transformations. 408Using the `transform` operator, we can [emit][FlowCollector.emit] arbitrary values an arbitrary number of times. 409 410For example, using `transform` we can emit a string before performing a long-running asynchronous request 411and follow it with a response: 412 413<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 414 415```kotlin 416import kotlinx.coroutines.* 417import kotlinx.coroutines.flow.* 418 419suspend fun performRequest(request: Int): String { 420 delay(1000) // imitate long-running asynchronous work 421 return "response $request" 422} 423 424fun main() = runBlocking<Unit> { 425//sampleStart 426 (1..3).asFlow() // a flow of requests 427 .transform { request -> 428 emit("Making request $request") 429 emit(performRequest(request)) 430 } 431 .collect { response -> println(response) } 432//sampleEnd 433} 434``` 435 436</div> 437 438> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-09.kt). 439 440The output of this code is: 441 442```text 443Making request 1 444response 1 445Making request 2 446response 2 447Making request 3 448response 3 449``` 450 451<!--- TEST --> 452 453#### Size-limiting operators 454 455Size-limiting intermediate operators like [take] cancel the execution of the flow when the corresponding limit 456is reached. Cancellation in coroutines is always performed by throwing an exception, so that all the resource-management 457functions (like `try { ... } finally { ... }` blocks) operate normally in case of cancellation: 458 459<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 460 461```kotlin 462import kotlinx.coroutines.* 463import kotlinx.coroutines.flow.* 464 465//sampleStart 466fun numbers(): Flow<Int> = flow { 467 try { 468 emit(1) 469 emit(2) 470 println("This line will not execute") 471 emit(3) 472 } finally { 473 println("Finally in numbers") 474 } 475} 476 477fun main() = runBlocking<Unit> { 478 numbers() 479 .take(2) // take only the first two 480 .collect { value -> println(value) } 481} 482//sampleEnd 483``` 484 485</div> 486 487> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-10.kt). 488 489The output of this code clearly shows that the execution of the `flow { ... }` body in the `numbers()` function 490stopped after emitting the second number: 491 492```text 4931 4942 495Finally in numbers 496``` 497 498<!--- TEST --> 499 500### Terminal flow operators 501 502Terminal operators on flows are _suspending functions_ that start a collection of the flow. 503The [collect] operator is the most basic one, but there are other terminal operators, which can make it easier: 504 505* Conversion to various collections like [toList] and [toSet]. 506* Operators to get the [first] value and to ensure that a flow emits a [single] value. 507* Reducing a flow to a value with [reduce] and [fold]. 508 509For example: 510 511<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 512 513```kotlin 514import kotlinx.coroutines.* 515import kotlinx.coroutines.flow.* 516 517fun main() = runBlocking<Unit> { 518//sampleStart 519 val sum = (1..5).asFlow() 520 .map { it * it } // squares of numbers from 1 to 5 521 .reduce { a, b -> a + b } // sum them (terminal operator) 522 println(sum) 523//sampleEnd 524} 525``` 526 527</div> 528 529> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-11.kt). 530 531Prints a single number: 532 533```text 53455 535``` 536 537<!--- TEST --> 538 539### Flows are sequential 540 541Each individual collection of a flow is performed sequentially unless special operators that operate 542on multiple flows are used. The collection works directly in the coroutine that calls a terminal operator. 543No new coroutines are launched by default. 544Each emitted value is processed by all the intermediate operators from 545upstream to downstream and is then delivered to the terminal operator after. 546 547See the following example that filters the even integers and maps them to strings: 548 549<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 550 551```kotlin 552import kotlinx.coroutines.* 553import kotlinx.coroutines.flow.* 554 555fun main() = runBlocking<Unit> { 556//sampleStart 557 (1..5).asFlow() 558 .filter { 559 println("Filter $it") 560 it % 2 == 0 561 } 562 .map { 563 println("Map $it") 564 "string $it" 565 }.collect { 566 println("Collect $it") 567 } 568//sampleEnd 569} 570``` 571 572</div> 573 574> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-12.kt). 575 576Producing: 577 578```text 579Filter 1 580Filter 2 581Map 2 582Collect string 2 583Filter 3 584Filter 4 585Map 4 586Collect string 4 587Filter 5 588``` 589 590<!--- TEST --> 591 592### Flow context 593 594Collection of a flow always happens in the context of the calling coroutine. For example, if there is 595a `simple` flow, then the following code runs in the context specified 596by the author of this code, regardless of the implementation details of the `simple` flow: 597 598<div class="sample" markdown="1" theme="idea" data-highlight-only> 599 600```kotlin 601withContext(context) { 602 simple().collect { value -> 603 println(value) // run in the specified context 604 } 605} 606``` 607 608</div> 609 610<!--- CLEAR --> 611 612This property of a flow is called _context preservation_. 613 614So, by default, code in the `flow { ... }` builder runs in the context that is provided by a collector 615of the corresponding flow. For example, consider the implementation of a `simple` function that prints the thread 616it is called on and emits three numbers: 617 618<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 619 620```kotlin 621import kotlinx.coroutines.* 622import kotlinx.coroutines.flow.* 623 624fun log(msg: String) = println("[${Thread.currentThread().name}] $msg") 625 626//sampleStart 627fun simple(): Flow<Int> = flow { 628 log("Started simple flow") 629 for (i in 1..3) { 630 emit(i) 631 } 632} 633 634fun main() = runBlocking<Unit> { 635 simple().collect { value -> log("Collected $value") } 636} 637//sampleEnd 638``` 639 640</div> 641 642> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-13.kt). 643 644Running this code produces: 645 646```text 647[main @coroutine#1] Started simple flow 648[main @coroutine#1] Collected 1 649[main @coroutine#1] Collected 2 650[main @coroutine#1] Collected 3 651``` 652 653<!--- TEST FLEXIBLE_THREAD --> 654 655Since `simple().collect` is called from the main thread, the body of `simple`'s flow is also called in the main thread. 656This is the perfect default for fast-running or asynchronous code that does not care about the execution context and 657does not block the caller. 658 659#### Wrong emission withContext 660 661However, the long-running CPU-consuming code might need to be executed in the context of [Dispatchers.Default] and UI-updating 662code might need to be executed in the context of [Dispatchers.Main]. Usually, [withContext] is used 663to change the context in the code using Kotlin coroutines, but code in the `flow { ... }` builder has to honor the context 664preservation property and is not allowed to [emit][FlowCollector.emit] from a different context. 665 666Try running the following code: 667 668<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 669 670```kotlin 671import kotlinx.coroutines.* 672import kotlinx.coroutines.flow.* 673 674//sampleStart 675fun simple(): Flow<Int> = flow { 676 // The WRONG way to change context for CPU-consuming code in flow builder 677 kotlinx.coroutines.withContext(Dispatchers.Default) { 678 for (i in 1..3) { 679 Thread.sleep(100) // pretend we are computing it in CPU-consuming way 680 emit(i) // emit next value 681 } 682 } 683} 684 685fun main() = runBlocking<Unit> { 686 simple().collect { value -> println(value) } 687} 688//sampleEnd 689``` 690 691</div> 692 693> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-14.kt). 694 695This code produces the following exception: 696 697```text 698Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated: 699 Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323], 700 but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default]. 701 Please refer to 'flow' documentation or use 'flowOn' instead 702 at ... 703``` 704 705<!--- TEST EXCEPTION --> 706 707#### flowOn operator 708 709The exception refers to the [flowOn] function that shall be used to change the context of the flow emission. 710The correct way to change the context of a flow is shown in the example below, which also prints the 711names of the corresponding threads to show how it all works: 712 713<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 714 715```kotlin 716import kotlinx.coroutines.* 717import kotlinx.coroutines.flow.* 718 719fun log(msg: String) = println("[${Thread.currentThread().name}] $msg") 720 721//sampleStart 722fun simple(): Flow<Int> = flow { 723 for (i in 1..3) { 724 Thread.sleep(100) // pretend we are computing it in CPU-consuming way 725 log("Emitting $i") 726 emit(i) // emit next value 727 } 728}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder 729 730fun main() = runBlocking<Unit> { 731 simple().collect { value -> 732 log("Collected $value") 733 } 734} 735//sampleEnd 736``` 737 738</div> 739 740> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-15.kt). 741 742Notice how `flow { ... }` works in the background thread, while collection happens in the main thread: 743 744<!--- TEST FLEXIBLE_THREAD 745[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1 746[main @coroutine#1] Collected 1 747[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2 748[main @coroutine#1] Collected 2 749[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3 750[main @coroutine#1] Collected 3 751--> 752 753Another thing to observe here is that the [flowOn] operator has changed the default sequential nature of the flow. 754Now collection happens in one coroutine ("coroutine#1") and emission happens in another coroutine 755("coroutine#2") that is running in another thread concurrently with the collecting coroutine. The [flowOn] operator 756creates another coroutine for an upstream flow when it has to change the [CoroutineDispatcher] in its context. 757 758### Buffering 759 760Running different parts of a flow in different coroutines can be helpful from the standpoint of the overall time it takes 761to collect the flow, especially when long-running asynchronous operations are involved. For example, consider a case when 762the emission by a `simple` flow is slow, taking 100 ms to produce an element; and collector is also slow, 763taking 300 ms to process an element. Let's see how long it takes to collect such a flow with three numbers: 764 765<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 766 767```kotlin 768import kotlinx.coroutines.* 769import kotlinx.coroutines.flow.* 770import kotlin.system.* 771 772//sampleStart 773fun simple(): Flow<Int> = flow { 774 for (i in 1..3) { 775 delay(100) // pretend we are asynchronously waiting 100 ms 776 emit(i) // emit next value 777 } 778} 779 780fun main() = runBlocking<Unit> { 781 val time = measureTimeMillis { 782 simple().collect { value -> 783 delay(300) // pretend we are processing it for 300 ms 784 println(value) 785 } 786 } 787 println("Collected in $time ms") 788} 789//sampleEnd 790``` 791 792</div> 793 794> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-16.kt). 795 796It produces something like this, with the whole collection taking around 1200 ms (three numbers, 400 ms for each): 797 798```text 7991 8002 8013 802Collected in 1220 ms 803``` 804 805<!--- TEST ARBITRARY_TIME --> 806 807We can use a [buffer] operator on a flow to run emitting code of the `simple` flow concurrently with collecting code, 808as opposed to running them sequentially: 809 810<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 811 812```kotlin 813import kotlinx.coroutines.* 814import kotlinx.coroutines.flow.* 815import kotlin.system.* 816 817fun simple(): Flow<Int> = flow { 818 for (i in 1..3) { 819 delay(100) // pretend we are asynchronously waiting 100 ms 820 emit(i) // emit next value 821 } 822} 823 824fun main() = runBlocking<Unit> { 825//sampleStart 826 val time = measureTimeMillis { 827 simple() 828 .buffer() // buffer emissions, don't wait 829 .collect { value -> 830 delay(300) // pretend we are processing it for 300 ms 831 println(value) 832 } 833 } 834 println("Collected in $time ms") 835//sampleEnd 836} 837``` 838 839</div> 840 841> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-17.kt). 842 843It produces the same numbers just faster, as we have effectively created a processing pipeline, 844having to only wait 100 ms for the first number and then spending only 300 ms to process 845each number. This way it takes around 1000 ms to run: 846 847```text 8481 8492 8503 851Collected in 1071 ms 852``` 853 854<!--- TEST ARBITRARY_TIME --> 855 856> Note that the [flowOn] operator uses the same buffering mechanism when it has to change a [CoroutineDispatcher], 857but here we explicitly request buffering without changing the execution context. 858 859#### Conflation 860 861When a flow represents partial results of the operation or operation status updates, it may not be necessary 862to process each value, but instead, only most recent ones. In this case, the [conflate] operator can be used to skip 863intermediate values when a collector is too slow to process them. Building on the previous example: 864 865<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 866 867```kotlin 868import kotlinx.coroutines.* 869import kotlinx.coroutines.flow.* 870import kotlin.system.* 871 872fun simple(): Flow<Int> = flow { 873 for (i in 1..3) { 874 delay(100) // pretend we are asynchronously waiting 100 ms 875 emit(i) // emit next value 876 } 877} 878 879fun main() = runBlocking<Unit> { 880//sampleStart 881 val time = measureTimeMillis { 882 simple() 883 .conflate() // conflate emissions, don't process each one 884 .collect { value -> 885 delay(300) // pretend we are processing it for 300 ms 886 println(value) 887 } 888 } 889 println("Collected in $time ms") 890//sampleEnd 891} 892``` 893 894</div> 895 896> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-18.kt). 897 898We see that while the first number was still being processed the second, and third were already produced, so 899the second one was _conflated_ and only the most recent (the third one) was delivered to the collector: 900 901```text 9021 9033 904Collected in 758 ms 905``` 906 907<!--- TEST ARBITRARY_TIME --> 908 909#### Processing the latest value 910 911Conflation is one way to speed up processing when both the emitter and collector are slow. It does it by dropping emitted values. 912The other way is to cancel a slow collector and restart it every time a new value is emitted. There is 913a family of `xxxLatest` operators that perform the same essential logic of a `xxx` operator, but cancel the 914code in their block on a new value. Let's try changing [conflate] to [collectLatest] in the previous example: 915 916<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 917 918```kotlin 919import kotlinx.coroutines.* 920import kotlinx.coroutines.flow.* 921import kotlin.system.* 922 923fun simple(): Flow<Int> = flow { 924 for (i in 1..3) { 925 delay(100) // pretend we are asynchronously waiting 100 ms 926 emit(i) // emit next value 927 } 928} 929 930fun main() = runBlocking<Unit> { 931//sampleStart 932 val time = measureTimeMillis { 933 simple() 934 .collectLatest { value -> // cancel & restart on the latest value 935 println("Collecting $value") 936 delay(300) // pretend we are processing it for 300 ms 937 println("Done $value") 938 } 939 } 940 println("Collected in $time ms") 941//sampleEnd 942} 943``` 944 945</div> 946 947> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-19.kt). 948 949Since the body of [collectLatest] takes 300 ms, but new values are emitted every 100 ms, we see that the block 950is run on every value, but completes only for the last value: 951 952```text 953Collecting 1 954Collecting 2 955Collecting 3 956Done 3 957Collected in 741 ms 958``` 959 960<!--- TEST ARBITRARY_TIME --> 961 962### Composing multiple flows 963 964There are lots of ways to compose multiple flows. 965 966#### Zip 967 968Just like the [Sequence.zip] extension function in the Kotlin standard library, 969flows have a [zip] operator that combines the corresponding values of two flows: 970 971<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 972 973```kotlin 974import kotlinx.coroutines.* 975import kotlinx.coroutines.flow.* 976 977fun main() = runBlocking<Unit> { 978//sampleStart 979 val nums = (1..3).asFlow() // numbers 1..3 980 val strs = flowOf("one", "two", "three") // strings 981 nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string 982 .collect { println(it) } // collect and print 983//sampleEnd 984} 985``` 986 987</div> 988 989> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-20.kt). 990 991This example prints: 992 993```text 9941 -> one 9952 -> two 9963 -> three 997``` 998 999<!--- TEST --> 1000 1001#### Combine 1002 1003When flow represents the most recent value of a variable or operation (see also the related 1004section on [conflation](#conflation)), it might be needed to perform a computation that depends on 1005the most recent values of the corresponding flows and to recompute it whenever any of the upstream 1006flows emit a value. The corresponding family of operators is called [combine]. 1007 1008For example, if the numbers in the previous example update every 300ms, but strings update every 400 ms, 1009then zipping them using the [zip] operator will still produce the same result, 1010albeit results that are printed every 400 ms: 1011 1012> We use a [onEach] intermediate operator in this example to delay each element and make the code 1013that emits sample flows more declarative and shorter. 1014 1015<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 1016 1017```kotlin 1018import kotlinx.coroutines.* 1019import kotlinx.coroutines.flow.* 1020 1021fun main() = runBlocking<Unit> { 1022//sampleStart 1023 val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms 1024 val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms 1025 val startTime = System.currentTimeMillis() // remember the start time 1026 nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip" 1027 .collect { value -> // collect and print 1028 println("$value at ${System.currentTimeMillis() - startTime} ms from start") 1029 } 1030//sampleEnd 1031} 1032``` 1033 1034</div> 1035 1036> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-21.kt). 1037 1038<!--- TEST ARBITRARY_TIME 10391 -> one at 437 ms from start 10402 -> two at 837 ms from start 10413 -> three at 1243 ms from start 1042--> 1043 1044However, when using a [combine] operator here instead of a [zip]: 1045 1046<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 1047 1048```kotlin 1049import kotlinx.coroutines.* 1050import kotlinx.coroutines.flow.* 1051 1052fun main() = runBlocking<Unit> { 1053//sampleStart 1054 val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms 1055 val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms 1056 val startTime = System.currentTimeMillis() // remember the start time 1057 nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine" 1058 .collect { value -> // collect and print 1059 println("$value at ${System.currentTimeMillis() - startTime} ms from start") 1060 } 1061//sampleEnd 1062} 1063``` 1064 1065</div> 1066 1067> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-22.kt). 1068 1069We get quite a different output, where a line is printed at each emission from either `nums` or `strs` flows: 1070 1071```text 10721 -> one at 452 ms from start 10732 -> one at 651 ms from start 10742 -> two at 854 ms from start 10753 -> two at 952 ms from start 10763 -> three at 1256 ms from start 1077``` 1078 1079<!--- TEST ARBITRARY_TIME --> 1080 1081### Flattening flows 1082 1083Flows represent asynchronously received sequences of values, so it is quite easy to get in a situation where 1084each value triggers a request for another sequence of values. For example, we can have the following 1085function that returns a flow of two strings 500 ms apart: 1086 1087<div class="sample" markdown="1" theme="idea" data-highlight-only> 1088 1089```kotlin 1090fun requestFlow(i: Int): Flow<String> = flow { 1091 emit("$i: First") 1092 delay(500) // wait 500 ms 1093 emit("$i: Second") 1094} 1095``` 1096 1097</div> 1098 1099<!--- CLEAR --> 1100 1101Now if we have a flow of three integers and call `requestFlow` for each of them like this: 1102 1103<div class="sample" markdown="1" theme="idea" data-highlight-only> 1104 1105```kotlin 1106(1..3).asFlow().map { requestFlow(it) } 1107``` 1108 1109</div> 1110 1111<!--- CLEAR --> 1112 1113Then we end up with a flow of flows (`Flow<Flow<String>>`) that needs to be _flattened_ into a single flow for 1114further processing. Collections and sequences have [flatten][Sequence.flatten] and [flatMap][Sequence.flatMap] 1115operators for this. However, due to the asynchronous nature of flows they call for different _modes_ of flattening, 1116as such, there is a family of flattening operators on flows. 1117 1118#### flatMapConcat 1119 1120Concatenating mode is implemented by [flatMapConcat] and [flattenConcat] operators. They are the most direct 1121analogues of the corresponding sequence operators. They wait for the inner flow to complete before 1122starting to collect the next one as the following example shows: 1123 1124<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 1125 1126```kotlin 1127import kotlinx.coroutines.* 1128import kotlinx.coroutines.flow.* 1129 1130fun requestFlow(i: Int): Flow<String> = flow { 1131 emit("$i: First") 1132 delay(500) // wait 500 ms 1133 emit("$i: Second") 1134} 1135 1136fun main() = runBlocking<Unit> { 1137//sampleStart 1138 val startTime = System.currentTimeMillis() // remember the start time 1139 (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 1140 .flatMapConcat { requestFlow(it) } 1141 .collect { value -> // collect and print 1142 println("$value at ${System.currentTimeMillis() - startTime} ms from start") 1143 } 1144//sampleEnd 1145} 1146``` 1147 1148</div> 1149 1150> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-23.kt). 1151 1152The sequential nature of [flatMapConcat] is clearly seen in the output: 1153 1154```text 11551: First at 121 ms from start 11561: Second at 622 ms from start 11572: First at 727 ms from start 11582: Second at 1227 ms from start 11593: First at 1328 ms from start 11603: Second at 1829 ms from start 1161``` 1162 1163<!--- TEST ARBITRARY_TIME --> 1164 1165#### flatMapMerge 1166 1167Another flattening mode is to concurrently collect all the incoming flows and merge their values into 1168a single flow so that values are emitted as soon as possible. 1169It is implemented by [flatMapMerge] and [flattenMerge] operators. They both accept an optional 1170`concurrency` parameter that limits the number of concurrent flows that are collected at the same time 1171(it is equal to [DEFAULT_CONCURRENCY] by default). 1172 1173<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 1174 1175```kotlin 1176import kotlinx.coroutines.* 1177import kotlinx.coroutines.flow.* 1178 1179fun requestFlow(i: Int): Flow<String> = flow { 1180 emit("$i: First") 1181 delay(500) // wait 500 ms 1182 emit("$i: Second") 1183} 1184 1185fun main() = runBlocking<Unit> { 1186//sampleStart 1187 val startTime = System.currentTimeMillis() // remember the start time 1188 (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 1189 .flatMapMerge { requestFlow(it) } 1190 .collect { value -> // collect and print 1191 println("$value at ${System.currentTimeMillis() - startTime} ms from start") 1192 } 1193//sampleEnd 1194} 1195``` 1196 1197</div> 1198 1199> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-24.kt). 1200 1201The concurrent nature of [flatMapMerge] is obvious: 1202 1203```text 12041: First at 136 ms from start 12052: First at 231 ms from start 12063: First at 333 ms from start 12071: Second at 639 ms from start 12082: Second at 732 ms from start 12093: Second at 833 ms from start 1210``` 1211 1212<!--- TEST ARBITRARY_TIME --> 1213 1214> Note that the [flatMapMerge] calls its block of code (`{ requestFlow(it) }` in this example) sequentially, but 1215collects the resulting flows concurrently, it is the equivalent of performing a sequential 1216`map { requestFlow(it) }` first and then calling [flattenMerge] on the result. 1217 1218#### flatMapLatest 1219 1220In a similar way to the [collectLatest] operator, that was shown in 1221["Processing the latest value"](#processing-the-latest-value) section, there is the corresponding "Latest" 1222flattening mode where a collection of the previous flow is cancelled as soon as new flow is emitted. 1223It is implemented by the [flatMapLatest] operator. 1224 1225<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 1226 1227```kotlin 1228import kotlinx.coroutines.* 1229import kotlinx.coroutines.flow.* 1230 1231fun requestFlow(i: Int): Flow<String> = flow { 1232 emit("$i: First") 1233 delay(500) // wait 500 ms 1234 emit("$i: Second") 1235} 1236 1237fun main() = runBlocking<Unit> { 1238//sampleStart 1239 val startTime = System.currentTimeMillis() // remember the start time 1240 (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 1241 .flatMapLatest { requestFlow(it) } 1242 .collect { value -> // collect and print 1243 println("$value at ${System.currentTimeMillis() - startTime} ms from start") 1244 } 1245//sampleEnd 1246} 1247``` 1248 1249</div> 1250 1251> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-25.kt). 1252 1253The output here in this example is a good demonstration of how [flatMapLatest] works: 1254 1255```text 12561: First at 142 ms from start 12572: First at 322 ms from start 12583: First at 425 ms from start 12593: Second at 931 ms from start 1260``` 1261 1262<!--- TEST ARBITRARY_TIME --> 1263 1264> Note that [flatMapLatest] cancels all the code in its block (`{ requestFlow(it) }` in this example) on a new value. 1265It makes no difference in this particular example, because the call to `requestFlow` itself is fast, not-suspending, 1266and cannot be cancelled. However, it would show up if we were to use suspending functions like `delay` in there. 1267 1268### Flow exceptions 1269 1270Flow collection can complete with an exception when an emitter or code inside the operators throw an exception. 1271There are several ways to handle these exceptions. 1272 1273#### Collector try and catch 1274 1275A collector can use Kotlin's [`try/catch`][exceptions] block to handle exceptions: 1276 1277<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 1278 1279```kotlin 1280import kotlinx.coroutines.* 1281import kotlinx.coroutines.flow.* 1282 1283//sampleStart 1284fun simple(): Flow<Int> = flow { 1285 for (i in 1..3) { 1286 println("Emitting $i") 1287 emit(i) // emit next value 1288 } 1289} 1290 1291fun main() = runBlocking<Unit> { 1292 try { 1293 simple().collect { value -> 1294 println(value) 1295 check(value <= 1) { "Collected $value" } 1296 } 1297 } catch (e: Throwable) { 1298 println("Caught $e") 1299 } 1300} 1301//sampleEnd 1302``` 1303 1304</div> 1305 1306> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-26.kt). 1307 1308This code successfully catches an exception in [collect] terminal operator and, 1309as we see, no more values are emitted after that: 1310 1311```text 1312Emitting 1 13131 1314Emitting 2 13152 1316Caught java.lang.IllegalStateException: Collected 2 1317``` 1318 1319<!--- TEST --> 1320 1321#### Everything is caught 1322 1323The previous example actually catches any exception happening in the emitter or in any intermediate or terminal operators. 1324For example, let's change the code so that emitted values are [mapped][map] to strings, 1325but the corresponding code produces an exception: 1326 1327<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 1328 1329```kotlin 1330import kotlinx.coroutines.* 1331import kotlinx.coroutines.flow.* 1332 1333//sampleStart 1334fun simple(): Flow<String> = 1335 flow { 1336 for (i in 1..3) { 1337 println("Emitting $i") 1338 emit(i) // emit next value 1339 } 1340 } 1341 .map { value -> 1342 check(value <= 1) { "Crashed on $value" } 1343 "string $value" 1344 } 1345 1346fun main() = runBlocking<Unit> { 1347 try { 1348 simple().collect { value -> println(value) } 1349 } catch (e: Throwable) { 1350 println("Caught $e") 1351 } 1352} 1353//sampleEnd 1354``` 1355 1356</div> 1357 1358> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-27.kt). 1359 1360This exception is still caught and collection is stopped: 1361 1362```text 1363Emitting 1 1364string 1 1365Emitting 2 1366Caught java.lang.IllegalStateException: Crashed on 2 1367``` 1368 1369<!--- TEST --> 1370 1371### Exception transparency 1372 1373But how can code of the emitter encapsulate its exception handling behavior? 1374 1375Flows must be _transparent to exceptions_ and it is a violation of the exception transparency to [emit][FlowCollector.emit] values in the 1376`flow { ... }` builder from inside of a `try/catch` block. This guarantees that a collector throwing an exception 1377can always catch it using `try/catch` as in the previous example. 1378 1379The emitter can use a [catch] operator that preserves this exception transparency and allows encapsulation 1380of its exception handling. The body of the `catch` operator can analyze an exception 1381and react to it in different ways depending on which exception was caught: 1382 1383* Exceptions can be rethrown using `throw`. 1384* Exceptions can be turned into emission of values using [emit][FlowCollector.emit] from the body of [catch]. 1385* Exceptions can be ignored, logged, or processed by some other code. 1386 1387For example, let us emit the text on catching an exception: 1388 1389<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 1390 1391```kotlin 1392import kotlinx.coroutines.* 1393import kotlinx.coroutines.flow.* 1394 1395fun simple(): Flow<String> = 1396 flow { 1397 for (i in 1..3) { 1398 println("Emitting $i") 1399 emit(i) // emit next value 1400 } 1401 } 1402 .map { value -> 1403 check(value <= 1) { "Crashed on $value" } 1404 "string $value" 1405 } 1406 1407fun main() = runBlocking<Unit> { 1408//sampleStart 1409 simple() 1410 .catch { e -> emit("Caught $e") } // emit on exception 1411 .collect { value -> println(value) } 1412//sampleEnd 1413} 1414``` 1415 1416</div> 1417 1418> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-28.kt). 1419 1420The output of the example is the same, even though we do not have `try/catch` around the code anymore. 1421 1422<!--- TEST 1423Emitting 1 1424string 1 1425Emitting 2 1426Caught java.lang.IllegalStateException: Crashed on 2 1427--> 1428 1429#### Transparent catch 1430 1431The [catch] intermediate operator, honoring exception transparency, catches only upstream exceptions 1432(that is an exception from all the operators above `catch`, but not below it). 1433If the block in `collect { ... }` (placed below `catch`) throws an exception then it escapes: 1434 1435<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 1436 1437```kotlin 1438import kotlinx.coroutines.* 1439import kotlinx.coroutines.flow.* 1440 1441//sampleStart 1442fun simple(): Flow<Int> = flow { 1443 for (i in 1..3) { 1444 println("Emitting $i") 1445 emit(i) 1446 } 1447} 1448 1449fun main() = runBlocking<Unit> { 1450 simple() 1451 .catch { e -> println("Caught $e") } // does not catch downstream exceptions 1452 .collect { value -> 1453 check(value <= 1) { "Collected $value" } 1454 println(value) 1455 } 1456} 1457//sampleEnd 1458``` 1459 1460</div> 1461 1462> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-29.kt). 1463 1464A "Caught ..." message is not printed despite there being a `catch` operator: 1465 1466```text 1467Emitting 1 14681 1469Emitting 2 1470Exception in thread "main" java.lang.IllegalStateException: Collected 2 1471 at ... 1472``` 1473 1474<!--- TEST EXCEPTION --> 1475 1476#### Catching declaratively 1477 1478We can combine the declarative nature of the [catch] operator with a desire to handle all the exceptions, by moving the body 1479of the [collect] operator into [onEach] and putting it before the `catch` operator. Collection of this flow must 1480be triggered by a call to `collect()` without parameters: 1481 1482<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 1483 1484```kotlin 1485import kotlinx.coroutines.* 1486import kotlinx.coroutines.flow.* 1487 1488fun simple(): Flow<Int> = flow { 1489 for (i in 1..3) { 1490 println("Emitting $i") 1491 emit(i) 1492 } 1493} 1494 1495fun main() = runBlocking<Unit> { 1496//sampleStart 1497 simple() 1498 .onEach { value -> 1499 check(value <= 1) { "Collected $value" } 1500 println(value) 1501 } 1502 .catch { e -> println("Caught $e") } 1503 .collect() 1504//sampleEnd 1505} 1506``` 1507 1508</div> 1509 1510> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-30.kt). 1511 1512Now we can see that a "Caught ..." message is printed and so we can catch all the exceptions without explicitly 1513using a `try/catch` block: 1514 1515```text 1516Emitting 1 15171 1518Emitting 2 1519Caught java.lang.IllegalStateException: Collected 2 1520``` 1521 1522<!--- TEST EXCEPTION --> 1523 1524### Flow completion 1525 1526When flow collection completes (normally or exceptionally) it may need to execute an action. 1527As you may have already noticed, it can be done in two ways: imperative or declarative. 1528 1529#### Imperative finally block 1530 1531In addition to `try`/`catch`, a collector can also use a `finally` block to execute an action 1532upon `collect` completion. 1533 1534<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 1535 1536```kotlin 1537import kotlinx.coroutines.* 1538import kotlinx.coroutines.flow.* 1539 1540//sampleStart 1541fun simple(): Flow<Int> = (1..3).asFlow() 1542 1543fun main() = runBlocking<Unit> { 1544 try { 1545 simple().collect { value -> println(value) } 1546 } finally { 1547 println("Done") 1548 } 1549} 1550//sampleEnd 1551``` 1552 1553</div> 1554 1555> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-31.kt). 1556 1557This code prints three numbers produced by the `simple` flow followed by a "Done" string: 1558 1559```text 15601 15612 15623 1563Done 1564``` 1565 1566<!--- TEST --> 1567 1568#### Declarative handling 1569 1570For the declarative approach, flow has [onCompletion] intermediate operator that is invoked 1571when the flow has completely collected. 1572 1573The previous example can be rewritten using an [onCompletion] operator and produces the same output: 1574 1575<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 1576 1577```kotlin 1578import kotlinx.coroutines.* 1579import kotlinx.coroutines.flow.* 1580 1581fun simple(): Flow<Int> = (1..3).asFlow() 1582 1583fun main() = runBlocking<Unit> { 1584//sampleStart 1585 simple() 1586 .onCompletion { println("Done") } 1587 .collect { value -> println(value) } 1588//sampleEnd 1589} 1590``` 1591</div> 1592 1593> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-32.kt). 1594 1595<!--- TEST 15961 15972 15983 1599Done 1600--> 1601 1602The key advantage of [onCompletion] is a nullable `Throwable` parameter of the lambda that can be used 1603to determine whether the flow collection was completed normally or exceptionally. In the following 1604example the `simple` flow throws an exception after emitting the number 1: 1605 1606<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 1607 1608```kotlin 1609import kotlinx.coroutines.* 1610import kotlinx.coroutines.flow.* 1611 1612//sampleStart 1613fun simple(): Flow<Int> = flow { 1614 emit(1) 1615 throw RuntimeException() 1616} 1617 1618fun main() = runBlocking<Unit> { 1619 simple() 1620 .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") } 1621 .catch { cause -> println("Caught exception") } 1622 .collect { value -> println(value) } 1623} 1624//sampleEnd 1625``` 1626</div> 1627 1628> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-33.kt). 1629 1630As you may expect, it prints: 1631 1632```text 16331 1634Flow completed exceptionally 1635Caught exception 1636``` 1637 1638<!--- TEST --> 1639 1640The [onCompletion] operator, unlike [catch], does not handle the exception. As we can see from the above 1641example code, the exception still flows downstream. It will be delivered to further `onCompletion` operators 1642and can be handled with a `catch` operator. 1643 1644#### Successful completion 1645 1646Another difference with [catch] operator is that [onCompletion] sees all exceptions and receives 1647a `null` exception only on successful completion of the upstream flow (without cancellation or failure). 1648 1649<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 1650 1651```kotlin 1652import kotlinx.coroutines.* 1653import kotlinx.coroutines.flow.* 1654 1655//sampleStart 1656fun simple(): Flow<Int> = (1..3).asFlow() 1657 1658fun main() = runBlocking<Unit> { 1659 simple() 1660 .onCompletion { cause -> println("Flow completed with $cause") } 1661 .collect { value -> 1662 check(value <= 1) { "Collected $value" } 1663 println(value) 1664 } 1665} 1666//sampleEnd 1667``` 1668 1669</div> 1670 1671> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-34.kt). 1672 1673We can see the completion cause is not null, because the flow was aborted due to downstream exception: 1674 1675```text 16761 1677Flow completed with java.lang.IllegalStateException: Collected 2 1678Exception in thread "main" java.lang.IllegalStateException: Collected 2 1679``` 1680 1681<!--- TEST EXCEPTION --> 1682 1683### Imperative versus declarative 1684 1685Now we know how to collect flow, and handle its completion and exceptions in both imperative and declarative ways. 1686The natural question here is, which approach is preferred and why? 1687As a library, we do not advocate for any particular approach and believe that both options 1688are valid and should be selected according to your own preferences and code style. 1689 1690### Launching flow 1691 1692It is easy to use flows to represent asynchronous events that are coming from some source. 1693In this case, we need an analogue of the `addEventListener` function that registers a piece of code with a reaction 1694for incoming events and continues further work. The [onEach] operator can serve this role. 1695However, `onEach` is an intermediate operator. We also need a terminal operator to collect the flow. 1696Otherwise, just calling `onEach` has no effect. 1697 1698If we use the [collect] terminal operator after `onEach`, then the code after it will wait until the flow is collected: 1699 1700<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 1701 1702```kotlin 1703import kotlinx.coroutines.* 1704import kotlinx.coroutines.flow.* 1705 1706//sampleStart 1707// Imitate a flow of events 1708fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) } 1709 1710fun main() = runBlocking<Unit> { 1711 events() 1712 .onEach { event -> println("Event: $event") } 1713 .collect() // <--- Collecting the flow waits 1714 println("Done") 1715} 1716//sampleEnd 1717``` 1718 1719</div> 1720 1721> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-35.kt). 1722 1723As you can see, it prints: 1724 1725```text 1726Event: 1 1727Event: 2 1728Event: 3 1729Done 1730``` 1731 1732<!--- TEST --> 1733 1734The [launchIn] terminal operator comes in handy here. By replacing `collect` with `launchIn` we can 1735launch a collection of the flow in a separate coroutine, so that execution of further code 1736immediately continues: 1737 1738<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 1739 1740```kotlin 1741import kotlinx.coroutines.* 1742import kotlinx.coroutines.flow.* 1743 1744// Imitate a flow of events 1745fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) } 1746 1747//sampleStart 1748fun main() = runBlocking<Unit> { 1749 events() 1750 .onEach { event -> println("Event: $event") } 1751 .launchIn(this) // <--- Launching the flow in a separate coroutine 1752 println("Done") 1753} 1754//sampleEnd 1755``` 1756 1757</div> 1758 1759> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-36.kt). 1760 1761It prints: 1762 1763```text 1764Done 1765Event: 1 1766Event: 2 1767Event: 3 1768``` 1769 1770<!--- TEST --> 1771 1772The required parameter to `launchIn` must specify a [CoroutineScope] in which the coroutine to collect the flow is 1773launched. In the above example this scope comes from the [runBlocking] 1774coroutine builder, so while the flow is running, this [runBlocking] scope waits for completion of its child coroutine 1775and keeps the main function from returning and terminating this example. 1776 1777In actual applications a scope will come from an entity with a limited 1778lifetime. As soon as the lifetime of this entity is terminated the corresponding scope is cancelled, cancelling 1779the collection of the corresponding flow. This way the pair of `onEach { ... }.launchIn(scope)` works 1780like the `addEventListener`. However, there is no need for the corresponding `removeEventListener` function, 1781as cancellation and structured concurrency serve this purpose. 1782 1783Note that [launchIn] also returns a [Job], which can be used to [cancel][Job.cancel] the corresponding flow collection 1784coroutine only without cancelling the whole scope or to [join][Job.join] it. 1785 1786### Flow cancellation checks 1787 1788For convenience, the [flow][_flow] builder performs additional [ensureActive] checks for cancellation on each emitted value. 1789It means that a busy loop emitting from a `flow { ... }` is cancellable: 1790 1791<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 1792 1793```kotlin 1794import kotlinx.coroutines.* 1795import kotlinx.coroutines.flow.* 1796 1797//sampleStart 1798fun foo(): Flow<Int> = flow { 1799 for (i in 1..5) { 1800 println("Emitting $i") 1801 emit(i) 1802 } 1803} 1804 1805fun main() = runBlocking<Unit> { 1806 foo().collect { value -> 1807 if (value == 3) cancel() 1808 println(value) 1809 } 1810} 1811//sampleEnd 1812``` 1813 1814</div> 1815 1816> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-37.kt). 1817 1818We get only numbers up to 3 and a [CancellationException] after trying to emit number 4: 1819 1820```text 1821Emitting 1 18221 1823Emitting 2 18242 1825Emitting 3 18263 1827Emitting 4 1828Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c 1829``` 1830 1831<!--- TEST EXCEPTION --> 1832 1833However, most other flow operators do not do additional cancellation checks on their own for performance reasons. 1834For example, if you use [IntRange.asFlow] extension to write the same busy loop and don't suspend anywhere, 1835then there are no checks for cancellation: 1836 1837<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 1838 1839```kotlin 1840import kotlinx.coroutines.* 1841import kotlinx.coroutines.flow.* 1842 1843//sampleStart 1844fun main() = runBlocking<Unit> { 1845 (1..5).asFlow().collect { value -> 1846 if (value == 3) cancel() 1847 println(value) 1848 } 1849} 1850//sampleEnd 1851``` 1852 1853</div> 1854 1855> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-38.kt). 1856 1857All numbers from 1 to 5 are collected and cancellation gets detected only before return from `runBlocking`: 1858 1859```text 18601 18612 18623 18634 18645 1865Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@3327bd23 1866``` 1867 1868<!--- TEST EXCEPTION --> 1869 1870#### Making busy flow cancellable 1871 1872In the case where you have a busy loop with coroutines you must explicitly check for cancellation. 1873You can add `.onEach { currentCoroutineContext().ensureActive() }`, but there is a ready-to-use 1874[cancellable] operator provided to do that: 1875 1876<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 1877 1878```kotlin 1879import kotlinx.coroutines.* 1880import kotlinx.coroutines.flow.* 1881 1882//sampleStart 1883fun main() = runBlocking<Unit> { 1884 (1..5).asFlow().cancellable().collect { value -> 1885 if (value == 3) cancel() 1886 println(value) 1887 } 1888} 1889//sampleEnd 1890``` 1891 1892</div> 1893 1894> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-39.kt). 1895 1896With the `cancellable` operator only the numbers from 1 to 3 are collected: 1897 1898```text 18991 19002 19013 1902Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365 1903``` 1904 1905<!--- TEST EXCEPTION --> 1906 1907### Flow and Reactive Streams 1908 1909For those who are familiar with [Reactive Streams](https://www.reactive-streams.org/) or reactive frameworks such as RxJava and project Reactor, 1910design of the Flow may look very familiar. 1911 1912Indeed, its design was inspired by Reactive Streams and its various implementations. But Flow main goal is to have as simple design as possible, 1913be Kotlin and suspension friendly and respect structured concurrency. Achieving this goal would be impossible without reactive pioneers and their tremendous work. You can read the complete story in [Reactive Streams and Kotlin Flows](https://medium.com/@elizarov/reactive-streams-and-kotlin-flows-bfd12772cda4) article. 1914 1915While being different, conceptually, Flow *is* a reactive stream and it is possible to convert it to the reactive (spec and TCK compliant) Publisher and vice versa. 1916Such converters are provided by `kotlinx.coroutines` out-of-the-box and can be found in corresponding reactive modules (`kotlinx-coroutines-reactive` for Reactive Streams, `kotlinx-coroutines-reactor` for Project Reactor and `kotlinx-coroutines-rx2`/`kotlinx-coroutines-rx3` for RxJava2/RxJava3). 1917Integration modules include conversions from and to `Flow`, integration with Reactor's `Context` and suspension-friendly ways to work with various reactive entities. 1918 1919<!-- stdlib references --> 1920 1921[collections]: https://kotlinlang.org/docs/reference/collections-overview.html 1922[List]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/-list/index.html 1923[forEach]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/for-each.html 1924[Sequence]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/index.html 1925[Sequence.zip]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/zip.html 1926[Sequence.flatten]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/flatten.html 1927[Sequence.flatMap]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/flat-map.html 1928[exceptions]: https://kotlinlang.org/docs/reference/exceptions.html 1929 1930<!--- MODULE kotlinx-coroutines-core --> 1931<!--- INDEX kotlinx.coroutines --> 1932[delay]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/delay.html 1933[withTimeoutOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-timeout-or-null.html 1934[Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html 1935[Dispatchers.Main]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-main.html 1936[withContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-context.html 1937[CoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-dispatcher/index.html 1938[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html 1939[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html 1940[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/index.html 1941[Job.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/cancel.html 1942[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/join.html 1943[ensureActive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/ensure-active.html 1944[CancellationException]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-cancellation-exception/index.html 1945<!--- INDEX kotlinx.coroutines.flow --> 1946[Flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html 1947[_flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow.html 1948[FlowCollector.emit]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow-collector/emit.html 1949[collect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect.html 1950[flowOf]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-of.html 1951[map]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/map.html 1952[filter]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/filter.html 1953[transform]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/transform.html 1954[take]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/take.html 1955[toList]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-list.html 1956[toSet]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-set.html 1957[first]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/first.html 1958[single]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/single.html 1959[reduce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/reduce.html 1960[fold]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/fold.html 1961[flowOn]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-on.html 1962[buffer]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/buffer.html 1963[conflate]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/conflate.html 1964[collectLatest]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect-latest.html 1965[zip]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/zip.html 1966[combine]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/combine.html 1967[onEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-each.html 1968[flatMapConcat]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-concat.html 1969[flattenConcat]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flatten-concat.html 1970[flatMapMerge]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-merge.html 1971[flattenMerge]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flatten-merge.html 1972[DEFAULT_CONCURRENCY]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-d-e-f-a-u-l-t_-c-o-n-c-u-r-r-e-n-c-y.html 1973[flatMapLatest]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-latest.html 1974[catch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/catch.html 1975[onCompletion]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-completion.html 1976[launchIn]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/launch-in.html 1977[IntRange.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/kotlin.ranges.-int-range/as-flow.html 1978[cancellable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/cancellable.html 1979<!--- END --> 1980