1<!--- TEST_NAME ChannelsGuideTest --> 2 3[//]: # (title: Channels) 4 5Deferred values provide a convenient way to transfer a single value between coroutines. 6Channels provide a way to transfer a stream of values. 7 8## Channel basics 9 10A [Channel] is conceptually very similar to `BlockingQueue`. One key difference is that 11instead of a blocking `put` operation it has a suspending [send][SendChannel.send], and instead of 12a blocking `take` operation it has a suspending [receive][ReceiveChannel.receive]. 13 14```kotlin 15import kotlinx.coroutines.* 16import kotlinx.coroutines.channels.* 17 18fun main() = runBlocking { 19//sampleStart 20 val channel = Channel<Int>() 21 launch { 22 // this might be heavy CPU-consuming computation or async logic, we'll just send five squares 23 for (x in 1..5) channel.send(x * x) 24 } 25 // here we print five received integers: 26 repeat(5) { println(channel.receive()) } 27 println("Done!") 28//sampleEnd 29} 30``` 31{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 32 33> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-01.kt). 34> 35{type="note"} 36 37The output of this code is: 38 39```text 401 414 429 4316 4425 45Done! 46``` 47 48<!--- TEST --> 49 50## Closing and iteration over channels 51 52Unlike a queue, a channel can be closed to indicate that no more elements are coming. 53On the receiver side it is convenient to use a regular `for` loop to receive elements 54from the channel. 55 56Conceptually, a [close][SendChannel.close] is like sending a special close token to the channel. 57The iteration stops as soon as this close token is received, so there is a guarantee 58that all previously sent elements before the close are received: 59 60```kotlin 61import kotlinx.coroutines.* 62import kotlinx.coroutines.channels.* 63 64fun main() = runBlocking { 65//sampleStart 66 val channel = Channel<Int>() 67 launch { 68 for (x in 1..5) channel.send(x * x) 69 channel.close() // we're done sending 70 } 71 // here we print received values using `for` loop (until the channel is closed) 72 for (y in channel) println(y) 73 println("Done!") 74//sampleEnd 75} 76``` 77{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 78 79> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-02.kt). 80> 81{type="note"} 82 83<!--- TEST 841 854 869 8716 8825 89Done! 90--> 91 92## Building channel producers 93 94The pattern where a coroutine is producing a sequence of elements is quite common. 95This is a part of _producer-consumer_ pattern that is often found in concurrent code. 96You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary 97to common sense that results must be returned from functions. 98 99There is a convenient coroutine builder named [produce] that makes it easy to do it right on producer side, 100and an extension function [consumeEach], that replaces a `for` loop on the consumer side: 101 102```kotlin 103import kotlinx.coroutines.* 104import kotlinx.coroutines.channels.* 105 106fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce { 107 for (x in 1..5) send(x * x) 108} 109 110fun main() = runBlocking { 111//sampleStart 112 val squares = produceSquares() 113 squares.consumeEach { println(it) } 114 println("Done!") 115//sampleEnd 116} 117``` 118{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 119 120> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-03.kt). 121> 122{type="note"} 123 124<!--- TEST 1251 1264 1279 12816 12925 130Done! 131--> 132 133## Pipelines 134 135A pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values: 136 137```kotlin 138fun CoroutineScope.produceNumbers() = produce<Int> { 139 var x = 1 140 while (true) send(x++) // infinite stream of integers starting from 1 141} 142``` 143 144And another coroutine or coroutines are consuming that stream, doing some processing, and producing some other results. 145In the example below, the numbers are just squared: 146 147```kotlin 148fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce { 149 for (x in numbers) send(x * x) 150} 151``` 152 153The main code starts and connects the whole pipeline: 154 155<!--- CLEAR --> 156 157```kotlin 158import kotlinx.coroutines.* 159import kotlinx.coroutines.channels.* 160 161fun main() = runBlocking { 162//sampleStart 163 val numbers = produceNumbers() // produces integers from 1 and on 164 val squares = square(numbers) // squares integers 165 repeat(5) { 166 println(squares.receive()) // print first five 167 } 168 println("Done!") // we are done 169 coroutineContext.cancelChildren() // cancel children coroutines 170//sampleEnd 171} 172 173fun CoroutineScope.produceNumbers() = produce<Int> { 174 var x = 1 175 while (true) send(x++) // infinite stream of integers starting from 1 176} 177 178fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce { 179 for (x in numbers) send(x * x) 180} 181``` 182{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 183 184> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-04.kt). 185> 186{type="note"} 187 188<!--- TEST 1891 1904 1919 19216 19325 194Done! 195--> 196 197> All functions that create coroutines are defined as extensions on [CoroutineScope], 198> so that we can rely on [structured concurrency](composing-suspending-functions.md#structured-concurrency-with-async) to make 199> sure that we don't have lingering global coroutines in our application. 200> 201{type="note"} 202 203## Prime numbers with pipeline 204 205Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline 206of coroutines. We start with an infinite sequence of numbers. 207 208```kotlin 209fun CoroutineScope.numbersFrom(start: Int) = produce<Int> { 210 var x = start 211 while (true) send(x++) // infinite stream of integers from start 212} 213``` 214 215The following pipeline stage filters an incoming stream of numbers, removing all the numbers 216that are divisible by the given prime number: 217 218```kotlin 219fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> { 220 for (x in numbers) if (x % prime != 0) send(x) 221} 222``` 223 224Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel, 225and launching new pipeline stage for each prime number found: 226 227```Plain Text 228numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ... 229``` 230 231The following example prints the first ten prime numbers, 232running the whole pipeline in the context of the main thread. Since all the coroutines are launched in 233the scope of the main [runBlocking] coroutine 234we don't have to keep an explicit list of all the coroutines we have started. 235We use [cancelChildren][kotlin.coroutines.CoroutineContext.cancelChildren] 236extension function to cancel all the children coroutines after we have printed 237the first ten prime numbers. 238 239<!--- CLEAR --> 240 241```kotlin 242import kotlinx.coroutines.* 243import kotlinx.coroutines.channels.* 244 245fun main() = runBlocking { 246//sampleStart 247 var cur = numbersFrom(2) 248 repeat(10) { 249 val prime = cur.receive() 250 println(prime) 251 cur = filter(cur, prime) 252 } 253 coroutineContext.cancelChildren() // cancel all children to let main finish 254//sampleEnd 255} 256 257fun CoroutineScope.numbersFrom(start: Int) = produce<Int> { 258 var x = start 259 while (true) send(x++) // infinite stream of integers from start 260} 261 262fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> { 263 for (x in numbers) if (x % prime != 0) send(x) 264} 265``` 266{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 267 268> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-05.kt). 269> 270{type="note"} 271 272The output of this code is: 273 274```text 2752 2763 2775 2787 27911 28013 28117 28219 28323 28429 285``` 286 287<!--- TEST --> 288 289Note that you can build the same pipeline using 290[`iterator`](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/iterator.html) 291coroutine builder from the standard library. 292Replace `produce` with `iterator`, `send` with `yield`, `receive` with `next`, 293`ReceiveChannel` with `Iterator`, and get rid of the coroutine scope. You will not need `runBlocking` either. 294However, the benefit of a pipeline that uses channels as shown above is that it can actually use 295multiple CPU cores if you run it in [Dispatchers.Default] context. 296 297Anyway, this is an extremely impractical way to find prime numbers. In practice, pipelines do involve some 298other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be 299built using `sequence`/`iterator`, because they do not allow arbitrary suspension, unlike 300`produce`, which is fully asynchronous. 301 302## Fan-out 303 304Multiple coroutines may receive from the same channel, distributing work between themselves. 305Let us start with a producer coroutine that is periodically producing integers 306(ten numbers per second): 307 308```kotlin 309fun CoroutineScope.produceNumbers() = produce<Int> { 310 var x = 1 // start from 1 311 while (true) { 312 send(x++) // produce next 313 delay(100) // wait 0.1s 314 } 315} 316``` 317 318Then we can have several processor coroutines. In this example, they just print their id and 319received number: 320 321```kotlin 322fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch { 323 for (msg in channel) { 324 println("Processor #$id received $msg") 325 } 326} 327``` 328 329Now let us launch five processors and let them work for almost a second. See what happens: 330 331<!--- CLEAR --> 332 333```kotlin 334import kotlinx.coroutines.* 335import kotlinx.coroutines.channels.* 336 337fun main() = runBlocking<Unit> { 338//sampleStart 339 val producer = produceNumbers() 340 repeat(5) { launchProcessor(it, producer) } 341 delay(950) 342 producer.cancel() // cancel producer coroutine and thus kill them all 343//sampleEnd 344} 345 346fun CoroutineScope.produceNumbers() = produce<Int> { 347 var x = 1 // start from 1 348 while (true) { 349 send(x++) // produce next 350 delay(100) // wait 0.1s 351 } 352} 353 354fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch { 355 for (msg in channel) { 356 println("Processor #$id received $msg") 357 } 358} 359``` 360{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 361 362> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-06.kt). 363> 364{type="note"} 365 366The output will be similar to the the following one, albeit the processor ids that receive 367each specific integer may be different: 368 369```text 370Processor #2 received 1 371Processor #4 received 2 372Processor #0 received 3 373Processor #1 received 4 374Processor #3 received 5 375Processor #2 received 6 376Processor #4 received 7 377Processor #0 received 8 378Processor #1 received 9 379Processor #3 received 10 380``` 381 382<!--- TEST lines.size == 10 && lines.withIndex().all { (i, line) -> line.startsWith("Processor #") && line.endsWith(" received ${i + 1}") } --> 383 384Note that cancelling a producer coroutine closes its channel, thus eventually terminating iteration 385over the channel that processor coroutines are doing. 386 387Also, pay attention to how we explicitly iterate over channel with `for` loop to perform fan-out in `launchProcessor` code. 388Unlike `consumeEach`, this `for` loop pattern is perfectly safe to use from multiple coroutines. If one of the processor 389coroutines fails, then others would still be processing the channel, while a processor that is written via `consumeEach` 390always consumes (cancels) the underlying channel on its normal or abnormal completion. 391 392## Fan-in 393 394Multiple coroutines may send to the same channel. 395For example, let us have a channel of strings, and a suspending function that 396repeatedly sends a specified string to this channel with a specified delay: 397 398```kotlin 399suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) { 400 while (true) { 401 delay(time) 402 channel.send(s) 403 } 404} 405``` 406 407Now, let us see what happens if we launch a couple of coroutines sending strings 408(in this example we launch them in the context of the main thread as main coroutine's children): 409 410<!--- CLEAR --> 411 412```kotlin 413import kotlinx.coroutines.* 414import kotlinx.coroutines.channels.* 415 416fun main() = runBlocking { 417//sampleStart 418 val channel = Channel<String>() 419 launch { sendString(channel, "foo", 200L) } 420 launch { sendString(channel, "BAR!", 500L) } 421 repeat(6) { // receive first six 422 println(channel.receive()) 423 } 424 coroutineContext.cancelChildren() // cancel all children to let main finish 425//sampleEnd 426} 427 428suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) { 429 while (true) { 430 delay(time) 431 channel.send(s) 432 } 433} 434``` 435{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 436 437> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-07.kt). 438> 439{type="note"} 440 441The output is: 442 443```text 444foo 445foo 446BAR! 447foo 448foo 449BAR! 450``` 451 452<!--- TEST --> 453 454## Buffered channels 455 456The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver 457meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked, 458if receive is invoked first, it is suspended until send is invoked. 459 460Both [Channel()] factory function and [produce] builder take an optional `capacity` parameter to 461specify _buffer size_. Buffer allows senders to send multiple elements before suspending, 462similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full. 463 464Take a look at the behavior of the following code: 465 466```kotlin 467import kotlinx.coroutines.* 468import kotlinx.coroutines.channels.* 469 470fun main() = runBlocking<Unit> { 471//sampleStart 472 val channel = Channel<Int>(4) // create buffered channel 473 val sender = launch { // launch sender coroutine 474 repeat(10) { 475 println("Sending $it") // print before sending each element 476 channel.send(it) // will suspend when buffer is full 477 } 478 } 479 // don't receive anything... just wait.... 480 delay(1000) 481 sender.cancel() // cancel sender coroutine 482//sampleEnd 483} 484``` 485{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 486 487> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-08.kt). 488> 489{type="note"} 490 491It prints "sending" _five_ times using a buffered channel with capacity of _four_: 492 493```text 494Sending 0 495Sending 1 496Sending 2 497Sending 3 498Sending 4 499``` 500 501<!--- TEST --> 502 503The first four elements are added to the buffer and the sender suspends when trying to send the fifth one. 504 505## Channels are fair 506 507Send and receive operations to channels are _fair_ with respect to the order of their invocation from 508multiple coroutines. They are served in first-in first-out order, e.g. the first coroutine to invoke `receive` 509gets the element. In the following example two coroutines "ping" and "pong" are 510receiving the "ball" object from the shared "table" channel. 511 512```kotlin 513import kotlinx.coroutines.* 514import kotlinx.coroutines.channels.* 515 516//sampleStart 517data class Ball(var hits: Int) 518 519fun main() = runBlocking { 520 val table = Channel<Ball>() // a shared table 521 launch { player("ping", table) } 522 launch { player("pong", table) } 523 table.send(Ball(0)) // serve the ball 524 delay(1000) // delay 1 second 525 coroutineContext.cancelChildren() // game over, cancel them 526} 527 528suspend fun player(name: String, table: Channel<Ball>) { 529 for (ball in table) { // receive the ball in a loop 530 ball.hits++ 531 println("$name $ball") 532 delay(300) // wait a bit 533 table.send(ball) // send the ball back 534 } 535} 536//sampleEnd 537``` 538{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 539 540> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-09.kt). 541> 542{type="note"} 543 544The "ping" coroutine is started first, so it is the first one to receive the ball. Even though "ping" 545coroutine immediately starts receiving the ball again after sending it back to the table, the ball gets 546received by the "pong" coroutine, because it was already waiting for it: 547 548```text 549ping Ball(hits=1) 550pong Ball(hits=2) 551ping Ball(hits=3) 552pong Ball(hits=4) 553``` 554 555<!--- TEST --> 556 557Note that sometimes channels may produce executions that look unfair due to the nature of the executor 558that is being used. See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/111) for details. 559 560## Ticker channels 561 562Ticker channel is a special rendezvous channel that produces `Unit` every time given delay passes since last consumption from this channel. 563Though it may seem to be useless standalone, it is a useful building block to create complex time-based [produce] 564pipelines and operators that do windowing and other time-dependent processing. 565Ticker channel can be used in [select] to perform "on tick" action. 566 567To create such channel use a factory method [ticker]. 568To indicate that no further elements are needed use [ReceiveChannel.cancel] method on it. 569 570Now let's see how it works in practice: 571 572```kotlin 573import kotlinx.coroutines.* 574import kotlinx.coroutines.channels.* 575 576//sampleStart 577fun main() = runBlocking<Unit> { 578 val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel 579 var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } 580 println("Initial element is available immediately: $nextElement") // no initial delay 581 582 nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements have 100ms delay 583 println("Next element is not ready in 50 ms: $nextElement") 584 585 nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } 586 println("Next element is ready in 100 ms: $nextElement") 587 588 // Emulate large consumption delays 589 println("Consumer pauses for 150ms") 590 delay(150) 591 // Next element is available immediately 592 nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } 593 println("Next element is available immediately after large consumer delay: $nextElement") 594 // Note that the pause between `receive` calls is taken into account and next element arrives faster 595 nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } 596 println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement") 597 598 tickerChannel.cancel() // indicate that no more elements are needed 599} 600//sampleEnd 601``` 602{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 603 604> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-10.kt). 605> 606{type="note"} 607 608It prints following lines: 609 610```text 611Initial element is available immediately: kotlin.Unit 612Next element is not ready in 50 ms: null 613Next element is ready in 100 ms: kotlin.Unit 614Consumer pauses for 150ms 615Next element is available immediately after large consumer delay: kotlin.Unit 616Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit 617``` 618 619<!--- TEST --> 620 621Note that [ticker] is aware of possible consumer pauses and, by default, adjusts next produced element 622delay if a pause occurs, trying to maintain a fixed rate of produced elements. 623 624Optionally, a `mode` parameter equal to [TickerMode.FIXED_DELAY] can be specified to maintain a fixed 625delay between elements. 626 627<!--- MODULE kotlinx-coroutines-core --> 628<!--- INDEX kotlinx.coroutines --> 629 630[CoroutineScope]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html 631[runBlocking]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html 632[kotlin.coroutines.CoroutineContext.cancelChildren]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/cancel-children.html 633[Dispatchers.Default]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html 634 635<!--- INDEX kotlinx.coroutines.channels --> 636 637[Channel]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html 638[SendChannel.send]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html 639[ReceiveChannel.receive]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html 640[SendChannel.close]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/close.html 641[produce]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html 642[consumeEach]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume-each.html 643[Channel()]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel.html 644[ticker]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/ticker.html 645[ReceiveChannel.cancel]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/cancel.html 646[TickerMode.FIXED_DELAY]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-ticker-mode/-f-i-x-e-d_-d-e-l-a-y/index.html 647 648<!--- INDEX kotlinx.coroutines.selects --> 649 650[select]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html 651 652<!--- END --> 653