• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1<!--- TEST_NAME ChannelsGuideTest -->
2
3**Table of contents**
4
5<!--- TOC -->
6
7* [Channels](#channels)
8  * [Channel basics](#channel-basics)
9  * [Closing and iteration over channels](#closing-and-iteration-over-channels)
10  * [Building channel producers](#building-channel-producers)
11  * [Pipelines](#pipelines)
12  * [Prime numbers with pipeline](#prime-numbers-with-pipeline)
13  * [Fan-out](#fan-out)
14  * [Fan-in](#fan-in)
15  * [Buffered channels](#buffered-channels)
16  * [Channels are fair](#channels-are-fair)
17  * [Ticker channels](#ticker-channels)
18
19<!--- END -->
20
21## Channels
22
23Deferred values provide a convenient way to transfer a single value between coroutines.
24Channels provide a way to transfer a stream of values.
25
26### Channel basics
27
28A [Channel] is conceptually very similar to `BlockingQueue`. One key difference is that
29instead of a blocking `put` operation it has a suspending [send][SendChannel.send], and instead of
30a blocking `take` operation it has a suspending [receive][ReceiveChannel.receive].
31
32
33<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
34
35```kotlin
36import kotlinx.coroutines.*
37import kotlinx.coroutines.channels.*
38
39fun main() = runBlocking {
40//sampleStart
41    val channel = Channel<Int>()
42    launch {
43        // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
44        for (x in 1..5) channel.send(x * x)
45    }
46    // here we print five received integers:
47    repeat(5) { println(channel.receive()) }
48    println("Done!")
49//sampleEnd
50}
51```
52
53</div>
54
55> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-01.kt).
56
57The output of this code is:
58
59```text
601
614
629
6316
6425
65Done!
66```
67
68<!--- TEST -->
69
70### Closing and iteration over channels
71
72Unlike a queue, a channel can be closed to indicate that no more elements are coming.
73On the receiver side it is convenient to use a regular `for` loop to receive elements
74from the channel.
75
76Conceptually, a [close][SendChannel.close] is like sending a special close token to the channel.
77The iteration stops as soon as this close token is received, so there is a guarantee
78that all previously sent elements before the close are received:
79
80<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
81
82```kotlin
83import kotlinx.coroutines.*
84import kotlinx.coroutines.channels.*
85
86fun main() = runBlocking {
87//sampleStart
88    val channel = Channel<Int>()
89    launch {
90        for (x in 1..5) channel.send(x * x)
91        channel.close() // we're done sending
92    }
93    // here we print received values using `for` loop (until the channel is closed)
94    for (y in channel) println(y)
95    println("Done!")
96//sampleEnd
97}
98```
99
100</div>
101
102> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-02.kt).
103
104<!--- TEST
1051
1064
1079
10816
10925
110Done!
111-->
112
113### Building channel producers
114
115The pattern where a coroutine is producing a sequence of elements is quite common.
116This is a part of _producer-consumer_ pattern that is often found in concurrent code.
117You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary
118to common sense that results must be returned from functions.
119
120There is a convenient coroutine builder named [produce] that makes it easy to do it right on producer side,
121and an extension function [consumeEach], that replaces a `for` loop on the consumer side:
122
123<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
124
125```kotlin
126import kotlinx.coroutines.*
127import kotlinx.coroutines.channels.*
128
129fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
130    for (x in 1..5) send(x * x)
131}
132
133fun main() = runBlocking {
134//sampleStart
135    val squares = produceSquares()
136    squares.consumeEach { println(it) }
137    println("Done!")
138//sampleEnd
139}
140```
141
142</div>
143
144> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-03.kt).
145
146<!--- TEST
1471
1484
1499
15016
15125
152Done!
153-->
154
155### Pipelines
156
157A pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values:
158
159<div class="sample" markdown="1" theme="idea" data-highlight-only>
160
161```kotlin
162fun CoroutineScope.produceNumbers() = produce<Int> {
163    var x = 1
164    while (true) send(x++) // infinite stream of integers starting from 1
165}
166```
167
168</div>
169
170And another coroutine or coroutines are consuming that stream, doing some processing, and producing some other results.
171In the example below, the numbers are just squared:
172
173<div class="sample" markdown="1" theme="idea" data-highlight-only>
174
175```kotlin
176fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
177    for (x in numbers) send(x * x)
178}
179```
180
181</div>
182
183The main code starts and connects the whole pipeline:
184
185<!--- CLEAR -->
186
187<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
188
189```kotlin
190import kotlinx.coroutines.*
191import kotlinx.coroutines.channels.*
192
193fun main() = runBlocking {
194//sampleStart
195    val numbers = produceNumbers() // produces integers from 1 and on
196    val squares = square(numbers) // squares integers
197    repeat(5) {
198        println(squares.receive()) // print first five
199    }
200    println("Done!") // we are done
201    coroutineContext.cancelChildren() // cancel children coroutines
202//sampleEnd
203}
204
205fun CoroutineScope.produceNumbers() = produce<Int> {
206    var x = 1
207    while (true) send(x++) // infinite stream of integers starting from 1
208}
209
210fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
211    for (x in numbers) send(x * x)
212}
213```
214
215</div>
216
217> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-04.kt).
218
219<!--- TEST
2201
2214
2229
22316
22425
225Done!
226-->
227
228> All functions that create coroutines are defined as extensions on [CoroutineScope],
229so that we can rely on [structured concurrency](https://kotlinlang.org/docs/reference/coroutines/composing-suspending-functions.html#structured-concurrency-with-async) to make
230sure that we don't have lingering global coroutines in our application.
231
232### Prime numbers with pipeline
233
234Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline
235of coroutines. We start with an infinite sequence of numbers.
236
237<div class="sample" markdown="1" theme="idea" data-highlight-only>
238
239```kotlin
240fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
241    var x = start
242    while (true) send(x++) // infinite stream of integers from start
243}
244```
245
246</div>
247
248The following pipeline stage filters an incoming stream of numbers, removing all the numbers
249that are divisible by the given prime number:
250
251<div class="sample" markdown="1" theme="idea" data-highlight-only>
252
253```kotlin
254fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
255    for (x in numbers) if (x % prime != 0) send(x)
256}
257```
258
259</div>
260
261Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel,
262and launching new pipeline stage for each prime number found:
263
264```
265numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
266```
267
268The following example prints the first ten prime numbers,
269running the whole pipeline in the context of the main thread. Since all the coroutines are launched in
270the scope of the main [runBlocking] coroutine
271we don't have to keep an explicit list of all the coroutines we have started.
272We use [cancelChildren][kotlin.coroutines.CoroutineContext.cancelChildren]
273extension function to cancel all the children coroutines after we have printed
274the first ten prime numbers.
275
276<!--- CLEAR -->
277
278<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
279
280```kotlin
281import kotlinx.coroutines.*
282import kotlinx.coroutines.channels.*
283
284fun main() = runBlocking {
285//sampleStart
286    var cur = numbersFrom(2)
287    repeat(10) {
288        val prime = cur.receive()
289        println(prime)
290        cur = filter(cur, prime)
291    }
292    coroutineContext.cancelChildren() // cancel all children to let main finish
293//sampleEnd
294}
295
296fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
297    var x = start
298    while (true) send(x++) // infinite stream of integers from start
299}
300
301fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
302    for (x in numbers) if (x % prime != 0) send(x)
303}
304```
305
306</div>
307
308> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-05.kt).
309
310The output of this code is:
311
312```text
3132
3143
3155
3167
31711
31813
31917
32019
32123
32229
323```
324
325<!--- TEST -->
326
327Note that you can build the same pipeline using
328[`iterator`](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/iterator.html)
329coroutine builder from the standard library.
330Replace `produce` with `iterator`, `send` with `yield`, `receive` with `next`,
331`ReceiveChannel` with `Iterator`, and get rid of the coroutine scope. You will not need `runBlocking` either.
332However, the benefit of a pipeline that uses channels as shown above is that it can actually use
333multiple CPU cores if you run it in [Dispatchers.Default] context.
334
335Anyway, this is an extremely impractical way to find prime numbers. In practice, pipelines do involve some
336other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be
337built using `sequence`/`iterator`, because they do not allow arbitrary suspension, unlike
338`produce`, which is fully asynchronous.
339
340### Fan-out
341
342Multiple coroutines may receive from the same channel, distributing work between themselves.
343Let us start with a producer coroutine that is periodically producing integers
344(ten numbers per second):
345
346<div class="sample" markdown="1" theme="idea" data-highlight-only>
347
348```kotlin
349fun CoroutineScope.produceNumbers() = produce<Int> {
350    var x = 1 // start from 1
351    while (true) {
352        send(x++) // produce next
353        delay(100) // wait 0.1s
354    }
355}
356```
357
358</div>
359
360Then we can have several processor coroutines. In this example, they just print their id and
361received number:
362
363<div class="sample" markdown="1" theme="idea" data-highlight-only>
364
365```kotlin
366fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
367    for (msg in channel) {
368        println("Processor #$id received $msg")
369    }
370}
371```
372
373</div>
374
375Now let us launch five processors and let them work for almost a second. See what happens:
376
377<!--- CLEAR -->
378
379<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
380
381```kotlin
382import kotlinx.coroutines.*
383import kotlinx.coroutines.channels.*
384
385fun main() = runBlocking<Unit> {
386//sampleStart
387    val producer = produceNumbers()
388    repeat(5) { launchProcessor(it, producer) }
389    delay(950)
390    producer.cancel() // cancel producer coroutine and thus kill them all
391//sampleEnd
392}
393
394fun CoroutineScope.produceNumbers() = produce<Int> {
395    var x = 1 // start from 1
396    while (true) {
397        send(x++) // produce next
398        delay(100) // wait 0.1s
399    }
400}
401
402fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
403    for (msg in channel) {
404        println("Processor #$id received $msg")
405    }
406}
407```
408
409</div>
410
411> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-06.kt).
412
413The output will be similar to the the following one, albeit the processor ids that receive
414each specific integer may be different:
415
416```
417Processor #2 received 1
418Processor #4 received 2
419Processor #0 received 3
420Processor #1 received 4
421Processor #3 received 5
422Processor #2 received 6
423Processor #4 received 7
424Processor #0 received 8
425Processor #1 received 9
426Processor #3 received 10
427```
428
429<!--- TEST lines.size == 10 && lines.withIndex().all { (i, line) -> line.startsWith("Processor #") && line.endsWith(" received ${i + 1}") } -->
430
431Note that cancelling a producer coroutine closes its channel, thus eventually terminating iteration
432over the channel that processor coroutines are doing.
433
434Also, pay attention to how we explicitly iterate over channel with `for` loop to perform fan-out in `launchProcessor` code.
435Unlike `consumeEach`, this `for` loop pattern is perfectly safe to use from multiple coroutines. If one of the processor
436coroutines fails, then others would still be processing the channel, while a processor that is written via `consumeEach`
437always consumes (cancels) the underlying channel on its normal or abnormal completion.
438
439### Fan-in
440
441Multiple coroutines may send to the same channel.
442For example, let us have a channel of strings, and a suspending function that
443repeatedly sends a specified string to this channel with a specified delay:
444
445<div class="sample" markdown="1" theme="idea" data-highlight-only>
446
447```kotlin
448suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
449    while (true) {
450        delay(time)
451        channel.send(s)
452    }
453}
454```
455
456</div>
457
458Now, let us see what happens if we launch a couple of coroutines sending strings
459(in this example we launch them in the context of the main thread as main coroutine's children):
460
461<!--- CLEAR -->
462
463<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
464
465```kotlin
466import kotlinx.coroutines.*
467import kotlinx.coroutines.channels.*
468
469fun main() = runBlocking {
470//sampleStart
471    val channel = Channel<String>()
472    launch { sendString(channel, "foo", 200L) }
473    launch { sendString(channel, "BAR!", 500L) }
474    repeat(6) { // receive first six
475        println(channel.receive())
476    }
477    coroutineContext.cancelChildren() // cancel all children to let main finish
478//sampleEnd
479}
480
481suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
482    while (true) {
483        delay(time)
484        channel.send(s)
485    }
486}
487```
488
489</div>
490
491> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-07.kt).
492
493The output is:
494
495```text
496foo
497foo
498BAR!
499foo
500foo
501BAR!
502```
503
504<!--- TEST -->
505
506### Buffered channels
507
508The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver
509meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked,
510if receive is invoked first, it is suspended until send is invoked.
511
512Both [Channel()] factory function and [produce] builder take an optional `capacity` parameter to
513specify _buffer size_. Buffer allows senders to send multiple elements before suspending,
514similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full.
515
516Take a look at the behavior of the following code:
517
518
519<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
520
521```kotlin
522import kotlinx.coroutines.*
523import kotlinx.coroutines.channels.*
524
525fun main() = runBlocking<Unit> {
526//sampleStart
527    val channel = Channel<Int>(4) // create buffered channel
528    val sender = launch { // launch sender coroutine
529        repeat(10) {
530            println("Sending $it") // print before sending each element
531            channel.send(it) // will suspend when buffer is full
532        }
533    }
534    // don't receive anything... just wait....
535    delay(1000)
536    sender.cancel() // cancel sender coroutine
537//sampleEnd
538}
539```
540
541</div>
542
543> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-08.kt).
544
545It prints "sending" _five_ times using a buffered channel with capacity of _four_:
546
547```text
548Sending 0
549Sending 1
550Sending 2
551Sending 3
552Sending 4
553```
554
555<!--- TEST -->
556
557The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.
558
559### Channels are fair
560
561Send and receive operations to channels are _fair_ with respect to the order of their invocation from
562multiple coroutines. They are served in first-in first-out order, e.g. the first coroutine to invoke `receive`
563gets the element. In the following example two coroutines "ping" and "pong" are
564receiving the "ball" object from the shared "table" channel.
565
566
567<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
568
569```kotlin
570import kotlinx.coroutines.*
571import kotlinx.coroutines.channels.*
572
573//sampleStart
574data class Ball(var hits: Int)
575
576fun main() = runBlocking {
577    val table = Channel<Ball>() // a shared table
578    launch { player("ping", table) }
579    launch { player("pong", table) }
580    table.send(Ball(0)) // serve the ball
581    delay(1000) // delay 1 second
582    coroutineContext.cancelChildren() // game over, cancel them
583}
584
585suspend fun player(name: String, table: Channel<Ball>) {
586    for (ball in table) { // receive the ball in a loop
587        ball.hits++
588        println("$name $ball")
589        delay(300) // wait a bit
590        table.send(ball) // send the ball back
591    }
592}
593//sampleEnd
594```
595
596</div>
597
598> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-09.kt).
599
600The "ping" coroutine is started first, so it is the first one to receive the ball. Even though "ping"
601coroutine immediately starts receiving the ball again after sending it back to the table, the ball gets
602received by the "pong" coroutine, because it was already waiting for it:
603
604```text
605ping Ball(hits=1)
606pong Ball(hits=2)
607ping Ball(hits=3)
608pong Ball(hits=4)
609```
610
611<!--- TEST -->
612
613Note that sometimes channels may produce executions that look unfair due to the nature of the executor
614that is being used. See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/111) for details.
615
616### Ticker channels
617
618Ticker channel is a special rendezvous channel that produces `Unit` every time given delay passes since last consumption from this channel.
619Though it may seem to be useless standalone, it is a useful building block to create complex time-based [produce]
620pipelines and operators that do windowing and other time-dependent processing.
621Ticker channel can be used in [select] to perform "on tick" action.
622
623To create such channel use a factory method [ticker].
624To indicate that no further elements are needed use [ReceiveChannel.cancel] method on it.
625
626Now let's see how it works in practice:
627
628<div class="sample" markdown="1" theme="idea" data-highlight-only>
629
630```kotlin
631import kotlinx.coroutines.*
632import kotlinx.coroutines.channels.*
633
634fun main() = runBlocking<Unit> {
635    val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
636    var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
637    println("Initial element is available immediately: $nextElement") // no initial delay
638
639    nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements have 100ms delay
640    println("Next element is not ready in 50 ms: $nextElement")
641
642    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
643    println("Next element is ready in 100 ms: $nextElement")
644
645    // Emulate large consumption delays
646    println("Consumer pauses for 150ms")
647    delay(150)
648    // Next element is available immediately
649    nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
650    println("Next element is available immediately after large consumer delay: $nextElement")
651    // Note that the pause between `receive` calls is taken into account and next element arrives faster
652    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
653    println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
654
655    tickerChannel.cancel() // indicate that no more elements are needed
656}
657```
658
659</div>
660
661> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-10.kt).
662
663It prints following lines:
664
665```text
666Initial element is available immediately: kotlin.Unit
667Next element is not ready in 50 ms: null
668Next element is ready in 100 ms: kotlin.Unit
669Consumer pauses for 150ms
670Next element is available immediately after large consumer delay: kotlin.Unit
671Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
672```
673
674<!--- TEST -->
675
676Note that [ticker] is aware of possible consumer pauses and, by default, adjusts next produced element
677delay if a pause occurs, trying to maintain a fixed rate of produced elements.
678
679Optionally, a `mode` parameter equal to [TickerMode.FIXED_DELAY] can be specified to maintain a fixed
680delay between elements.
681
682
683<!--- MODULE kotlinx-coroutines-core -->
684<!--- INDEX kotlinx.coroutines -->
685[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html
686[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
687[kotlin.coroutines.CoroutineContext.cancelChildren]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/kotlin.coroutines.-coroutine-context/cancel-children.html
688[Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
689<!--- INDEX kotlinx.coroutines.channels -->
690[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html
691[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html
692[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html
693[SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/close.html
694[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html
695[consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume-each.html
696[Channel()]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel.html
697[ticker]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/ticker.html
698[ReceiveChannel.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/cancel.html
699[TickerMode.FIXED_DELAY]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-ticker-mode/-f-i-x-e-d_-d-e-l-a-y.html
700<!--- INDEX kotlinx.coroutines.selects -->
701[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html
702<!--- END -->
703