1<!--- TEST_NAME ChannelsGuideTest --> 2 3**Table of contents** 4 5<!--- TOC --> 6 7* [Channels](#channels) 8 * [Channel basics](#channel-basics) 9 * [Closing and iteration over channels](#closing-and-iteration-over-channels) 10 * [Building channel producers](#building-channel-producers) 11 * [Pipelines](#pipelines) 12 * [Prime numbers with pipeline](#prime-numbers-with-pipeline) 13 * [Fan-out](#fan-out) 14 * [Fan-in](#fan-in) 15 * [Buffered channels](#buffered-channels) 16 * [Channels are fair](#channels-are-fair) 17 * [Ticker channels](#ticker-channels) 18 19<!--- END --> 20 21## Channels 22 23Deferred values provide a convenient way to transfer a single value between coroutines. 24Channels provide a way to transfer a stream of values. 25 26### Channel basics 27 28A [Channel] is conceptually very similar to `BlockingQueue`. One key difference is that 29instead of a blocking `put` operation it has a suspending [send][SendChannel.send], and instead of 30a blocking `take` operation it has a suspending [receive][ReceiveChannel.receive]. 31 32 33<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 34 35```kotlin 36import kotlinx.coroutines.* 37import kotlinx.coroutines.channels.* 38 39fun main() = runBlocking { 40//sampleStart 41 val channel = Channel<Int>() 42 launch { 43 // this might be heavy CPU-consuming computation or async logic, we'll just send five squares 44 for (x in 1..5) channel.send(x * x) 45 } 46 // here we print five received integers: 47 repeat(5) { println(channel.receive()) } 48 println("Done!") 49//sampleEnd 50} 51``` 52 53</div> 54 55> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-01.kt). 56 57The output of this code is: 58 59```text 601 614 629 6316 6425 65Done! 66``` 67 68<!--- TEST --> 69 70### Closing and iteration over channels 71 72Unlike a queue, a channel can be closed to indicate that no more elements are coming. 73On the receiver side it is convenient to use a regular `for` loop to receive elements 74from the channel. 75 76Conceptually, a [close][SendChannel.close] is like sending a special close token to the channel. 77The iteration stops as soon as this close token is received, so there is a guarantee 78that all previously sent elements before the close are received: 79 80<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 81 82```kotlin 83import kotlinx.coroutines.* 84import kotlinx.coroutines.channels.* 85 86fun main() = runBlocking { 87//sampleStart 88 val channel = Channel<Int>() 89 launch { 90 for (x in 1..5) channel.send(x * x) 91 channel.close() // we're done sending 92 } 93 // here we print received values using `for` loop (until the channel is closed) 94 for (y in channel) println(y) 95 println("Done!") 96//sampleEnd 97} 98``` 99 100</div> 101 102> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-02.kt). 103 104<!--- TEST 1051 1064 1079 10816 10925 110Done! 111--> 112 113### Building channel producers 114 115The pattern where a coroutine is producing a sequence of elements is quite common. 116This is a part of _producer-consumer_ pattern that is often found in concurrent code. 117You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary 118to common sense that results must be returned from functions. 119 120There is a convenient coroutine builder named [produce] that makes it easy to do it right on producer side, 121and an extension function [consumeEach], that replaces a `for` loop on the consumer side: 122 123<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 124 125```kotlin 126import kotlinx.coroutines.* 127import kotlinx.coroutines.channels.* 128 129fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce { 130 for (x in 1..5) send(x * x) 131} 132 133fun main() = runBlocking { 134//sampleStart 135 val squares = produceSquares() 136 squares.consumeEach { println(it) } 137 println("Done!") 138//sampleEnd 139} 140``` 141 142</div> 143 144> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-03.kt). 145 146<!--- TEST 1471 1484 1499 15016 15125 152Done! 153--> 154 155### Pipelines 156 157A pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values: 158 159<div class="sample" markdown="1" theme="idea" data-highlight-only> 160 161```kotlin 162fun CoroutineScope.produceNumbers() = produce<Int> { 163 var x = 1 164 while (true) send(x++) // infinite stream of integers starting from 1 165} 166``` 167 168</div> 169 170And another coroutine or coroutines are consuming that stream, doing some processing, and producing some other results. 171In the example below, the numbers are just squared: 172 173<div class="sample" markdown="1" theme="idea" data-highlight-only> 174 175```kotlin 176fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce { 177 for (x in numbers) send(x * x) 178} 179``` 180 181</div> 182 183The main code starts and connects the whole pipeline: 184 185<!--- CLEAR --> 186 187<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 188 189```kotlin 190import kotlinx.coroutines.* 191import kotlinx.coroutines.channels.* 192 193fun main() = runBlocking { 194//sampleStart 195 val numbers = produceNumbers() // produces integers from 1 and on 196 val squares = square(numbers) // squares integers 197 repeat(5) { 198 println(squares.receive()) // print first five 199 } 200 println("Done!") // we are done 201 coroutineContext.cancelChildren() // cancel children coroutines 202//sampleEnd 203} 204 205fun CoroutineScope.produceNumbers() = produce<Int> { 206 var x = 1 207 while (true) send(x++) // infinite stream of integers starting from 1 208} 209 210fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce { 211 for (x in numbers) send(x * x) 212} 213``` 214 215</div> 216 217> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-04.kt). 218 219<!--- TEST 2201 2214 2229 22316 22425 225Done! 226--> 227 228> All functions that create coroutines are defined as extensions on [CoroutineScope], 229so that we can rely on [structured concurrency](https://kotlinlang.org/docs/reference/coroutines/composing-suspending-functions.html#structured-concurrency-with-async) to make 230sure that we don't have lingering global coroutines in our application. 231 232### Prime numbers with pipeline 233 234Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline 235of coroutines. We start with an infinite sequence of numbers. 236 237<div class="sample" markdown="1" theme="idea" data-highlight-only> 238 239```kotlin 240fun CoroutineScope.numbersFrom(start: Int) = produce<Int> { 241 var x = start 242 while (true) send(x++) // infinite stream of integers from start 243} 244``` 245 246</div> 247 248The following pipeline stage filters an incoming stream of numbers, removing all the numbers 249that are divisible by the given prime number: 250 251<div class="sample" markdown="1" theme="idea" data-highlight-only> 252 253```kotlin 254fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> { 255 for (x in numbers) if (x % prime != 0) send(x) 256} 257``` 258 259</div> 260 261Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel, 262and launching new pipeline stage for each prime number found: 263 264``` 265numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ... 266``` 267 268The following example prints the first ten prime numbers, 269running the whole pipeline in the context of the main thread. Since all the coroutines are launched in 270the scope of the main [runBlocking] coroutine 271we don't have to keep an explicit list of all the coroutines we have started. 272We use [cancelChildren][kotlin.coroutines.CoroutineContext.cancelChildren] 273extension function to cancel all the children coroutines after we have printed 274the first ten prime numbers. 275 276<!--- CLEAR --> 277 278<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 279 280```kotlin 281import kotlinx.coroutines.* 282import kotlinx.coroutines.channels.* 283 284fun main() = runBlocking { 285//sampleStart 286 var cur = numbersFrom(2) 287 repeat(10) { 288 val prime = cur.receive() 289 println(prime) 290 cur = filter(cur, prime) 291 } 292 coroutineContext.cancelChildren() // cancel all children to let main finish 293//sampleEnd 294} 295 296fun CoroutineScope.numbersFrom(start: Int) = produce<Int> { 297 var x = start 298 while (true) send(x++) // infinite stream of integers from start 299} 300 301fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> { 302 for (x in numbers) if (x % prime != 0) send(x) 303} 304``` 305 306</div> 307 308> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-05.kt). 309 310The output of this code is: 311 312```text 3132 3143 3155 3167 31711 31813 31917 32019 32123 32229 323``` 324 325<!--- TEST --> 326 327Note that you can build the same pipeline using 328[`iterator`](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/iterator.html) 329coroutine builder from the standard library. 330Replace `produce` with `iterator`, `send` with `yield`, `receive` with `next`, 331`ReceiveChannel` with `Iterator`, and get rid of the coroutine scope. You will not need `runBlocking` either. 332However, the benefit of a pipeline that uses channels as shown above is that it can actually use 333multiple CPU cores if you run it in [Dispatchers.Default] context. 334 335Anyway, this is an extremely impractical way to find prime numbers. In practice, pipelines do involve some 336other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be 337built using `sequence`/`iterator`, because they do not allow arbitrary suspension, unlike 338`produce`, which is fully asynchronous. 339 340### Fan-out 341 342Multiple coroutines may receive from the same channel, distributing work between themselves. 343Let us start with a producer coroutine that is periodically producing integers 344(ten numbers per second): 345 346<div class="sample" markdown="1" theme="idea" data-highlight-only> 347 348```kotlin 349fun CoroutineScope.produceNumbers() = produce<Int> { 350 var x = 1 // start from 1 351 while (true) { 352 send(x++) // produce next 353 delay(100) // wait 0.1s 354 } 355} 356``` 357 358</div> 359 360Then we can have several processor coroutines. In this example, they just print their id and 361received number: 362 363<div class="sample" markdown="1" theme="idea" data-highlight-only> 364 365```kotlin 366fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch { 367 for (msg in channel) { 368 println("Processor #$id received $msg") 369 } 370} 371``` 372 373</div> 374 375Now let us launch five processors and let them work for almost a second. See what happens: 376 377<!--- CLEAR --> 378 379<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 380 381```kotlin 382import kotlinx.coroutines.* 383import kotlinx.coroutines.channels.* 384 385fun main() = runBlocking<Unit> { 386//sampleStart 387 val producer = produceNumbers() 388 repeat(5) { launchProcessor(it, producer) } 389 delay(950) 390 producer.cancel() // cancel producer coroutine and thus kill them all 391//sampleEnd 392} 393 394fun CoroutineScope.produceNumbers() = produce<Int> { 395 var x = 1 // start from 1 396 while (true) { 397 send(x++) // produce next 398 delay(100) // wait 0.1s 399 } 400} 401 402fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch { 403 for (msg in channel) { 404 println("Processor #$id received $msg") 405 } 406} 407``` 408 409</div> 410 411> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-06.kt). 412 413The output will be similar to the the following one, albeit the processor ids that receive 414each specific integer may be different: 415 416``` 417Processor #2 received 1 418Processor #4 received 2 419Processor #0 received 3 420Processor #1 received 4 421Processor #3 received 5 422Processor #2 received 6 423Processor #4 received 7 424Processor #0 received 8 425Processor #1 received 9 426Processor #3 received 10 427``` 428 429<!--- TEST lines.size == 10 && lines.withIndex().all { (i, line) -> line.startsWith("Processor #") && line.endsWith(" received ${i + 1}") } --> 430 431Note that cancelling a producer coroutine closes its channel, thus eventually terminating iteration 432over the channel that processor coroutines are doing. 433 434Also, pay attention to how we explicitly iterate over channel with `for` loop to perform fan-out in `launchProcessor` code. 435Unlike `consumeEach`, this `for` loop pattern is perfectly safe to use from multiple coroutines. If one of the processor 436coroutines fails, then others would still be processing the channel, while a processor that is written via `consumeEach` 437always consumes (cancels) the underlying channel on its normal or abnormal completion. 438 439### Fan-in 440 441Multiple coroutines may send to the same channel. 442For example, let us have a channel of strings, and a suspending function that 443repeatedly sends a specified string to this channel with a specified delay: 444 445<div class="sample" markdown="1" theme="idea" data-highlight-only> 446 447```kotlin 448suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) { 449 while (true) { 450 delay(time) 451 channel.send(s) 452 } 453} 454``` 455 456</div> 457 458Now, let us see what happens if we launch a couple of coroutines sending strings 459(in this example we launch them in the context of the main thread as main coroutine's children): 460 461<!--- CLEAR --> 462 463<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 464 465```kotlin 466import kotlinx.coroutines.* 467import kotlinx.coroutines.channels.* 468 469fun main() = runBlocking { 470//sampleStart 471 val channel = Channel<String>() 472 launch { sendString(channel, "foo", 200L) } 473 launch { sendString(channel, "BAR!", 500L) } 474 repeat(6) { // receive first six 475 println(channel.receive()) 476 } 477 coroutineContext.cancelChildren() // cancel all children to let main finish 478//sampleEnd 479} 480 481suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) { 482 while (true) { 483 delay(time) 484 channel.send(s) 485 } 486} 487``` 488 489</div> 490 491> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-07.kt). 492 493The output is: 494 495```text 496foo 497foo 498BAR! 499foo 500foo 501BAR! 502``` 503 504<!--- TEST --> 505 506### Buffered channels 507 508The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver 509meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked, 510if receive is invoked first, it is suspended until send is invoked. 511 512Both [Channel()] factory function and [produce] builder take an optional `capacity` parameter to 513specify _buffer size_. Buffer allows senders to send multiple elements before suspending, 514similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full. 515 516Take a look at the behavior of the following code: 517 518 519<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 520 521```kotlin 522import kotlinx.coroutines.* 523import kotlinx.coroutines.channels.* 524 525fun main() = runBlocking<Unit> { 526//sampleStart 527 val channel = Channel<Int>(4) // create buffered channel 528 val sender = launch { // launch sender coroutine 529 repeat(10) { 530 println("Sending $it") // print before sending each element 531 channel.send(it) // will suspend when buffer is full 532 } 533 } 534 // don't receive anything... just wait.... 535 delay(1000) 536 sender.cancel() // cancel sender coroutine 537//sampleEnd 538} 539``` 540 541</div> 542 543> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-08.kt). 544 545It prints "sending" _five_ times using a buffered channel with capacity of _four_: 546 547```text 548Sending 0 549Sending 1 550Sending 2 551Sending 3 552Sending 4 553``` 554 555<!--- TEST --> 556 557The first four elements are added to the buffer and the sender suspends when trying to send the fifth one. 558 559### Channels are fair 560 561Send and receive operations to channels are _fair_ with respect to the order of their invocation from 562multiple coroutines. They are served in first-in first-out order, e.g. the first coroutine to invoke `receive` 563gets the element. In the following example two coroutines "ping" and "pong" are 564receiving the "ball" object from the shared "table" channel. 565 566 567<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 568 569```kotlin 570import kotlinx.coroutines.* 571import kotlinx.coroutines.channels.* 572 573//sampleStart 574data class Ball(var hits: Int) 575 576fun main() = runBlocking { 577 val table = Channel<Ball>() // a shared table 578 launch { player("ping", table) } 579 launch { player("pong", table) } 580 table.send(Ball(0)) // serve the ball 581 delay(1000) // delay 1 second 582 coroutineContext.cancelChildren() // game over, cancel them 583} 584 585suspend fun player(name: String, table: Channel<Ball>) { 586 for (ball in table) { // receive the ball in a loop 587 ball.hits++ 588 println("$name $ball") 589 delay(300) // wait a bit 590 table.send(ball) // send the ball back 591 } 592} 593//sampleEnd 594``` 595 596</div> 597 598> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-09.kt). 599 600The "ping" coroutine is started first, so it is the first one to receive the ball. Even though "ping" 601coroutine immediately starts receiving the ball again after sending it back to the table, the ball gets 602received by the "pong" coroutine, because it was already waiting for it: 603 604```text 605ping Ball(hits=1) 606pong Ball(hits=2) 607ping Ball(hits=3) 608pong Ball(hits=4) 609``` 610 611<!--- TEST --> 612 613Note that sometimes channels may produce executions that look unfair due to the nature of the executor 614that is being used. See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/111) for details. 615 616### Ticker channels 617 618Ticker channel is a special rendezvous channel that produces `Unit` every time given delay passes since last consumption from this channel. 619Though it may seem to be useless standalone, it is a useful building block to create complex time-based [produce] 620pipelines and operators that do windowing and other time-dependent processing. 621Ticker channel can be used in [select] to perform "on tick" action. 622 623To create such channel use a factory method [ticker]. 624To indicate that no further elements are needed use [ReceiveChannel.cancel] method on it. 625 626Now let's see how it works in practice: 627 628<div class="sample" markdown="1" theme="idea" data-highlight-only> 629 630```kotlin 631import kotlinx.coroutines.* 632import kotlinx.coroutines.channels.* 633 634fun main() = runBlocking<Unit> { 635 val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel 636 var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } 637 println("Initial element is available immediately: $nextElement") // no initial delay 638 639 nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements have 100ms delay 640 println("Next element is not ready in 50 ms: $nextElement") 641 642 nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } 643 println("Next element is ready in 100 ms: $nextElement") 644 645 // Emulate large consumption delays 646 println("Consumer pauses for 150ms") 647 delay(150) 648 // Next element is available immediately 649 nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } 650 println("Next element is available immediately after large consumer delay: $nextElement") 651 // Note that the pause between `receive` calls is taken into account and next element arrives faster 652 nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } 653 println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement") 654 655 tickerChannel.cancel() // indicate that no more elements are needed 656} 657``` 658 659</div> 660 661> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-10.kt). 662 663It prints following lines: 664 665```text 666Initial element is available immediately: kotlin.Unit 667Next element is not ready in 50 ms: null 668Next element is ready in 100 ms: kotlin.Unit 669Consumer pauses for 150ms 670Next element is available immediately after large consumer delay: kotlin.Unit 671Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit 672``` 673 674<!--- TEST --> 675 676Note that [ticker] is aware of possible consumer pauses and, by default, adjusts next produced element 677delay if a pause occurs, trying to maintain a fixed rate of produced elements. 678 679Optionally, a `mode` parameter equal to [TickerMode.FIXED_DELAY] can be specified to maintain a fixed 680delay between elements. 681 682 683<!--- MODULE kotlinx-coroutines-core --> 684<!--- INDEX kotlinx.coroutines --> 685[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html 686[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html 687[kotlin.coroutines.CoroutineContext.cancelChildren]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/kotlin.coroutines.-coroutine-context/cancel-children.html 688[Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html 689<!--- INDEX kotlinx.coroutines.channels --> 690[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html 691[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html 692[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html 693[SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/close.html 694[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html 695[consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume-each.html 696[Channel()]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel.html 697[ticker]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/ticker.html 698[ReceiveChannel.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/cancel.html 699[TickerMode.FIXED_DELAY]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-ticker-mode/-f-i-x-e-d_-d-e-l-a-y.html 700<!--- INDEX kotlinx.coroutines.selects --> 701[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html 702<!--- END --> 703