• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1<!--- TEST_NAME ChannelsGuideTest -->
2
3[//]: # (title: Channels)
4
5Deferred values provide a convenient way to transfer a single value between coroutines.
6Channels provide a way to transfer a stream of values.
7
8## Channel basics
9
10A [Channel] is conceptually very similar to `BlockingQueue`. One key difference is that
11instead of a blocking `put` operation it has a suspending [send][SendChannel.send], and instead of
12a blocking `take` operation it has a suspending [receive][ReceiveChannel.receive].
13
14```kotlin
15import kotlinx.coroutines.*
16import kotlinx.coroutines.channels.*
17
18fun main() = runBlocking {
19//sampleStart
20    val channel = Channel<Int>()
21    launch {
22        // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
23        for (x in 1..5) channel.send(x * x)
24    }
25    // here we print five received integers:
26    repeat(5) { println(channel.receive()) }
27    println("Done!")
28//sampleEnd
29}
30```
31{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
32
33> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-01.kt).
34>
35{type="note"}
36
37The output of this code is:
38
39```text
401
414
429
4316
4425
45Done!
46```
47
48<!--- TEST -->
49
50## Closing and iteration over channels
51
52Unlike a queue, a channel can be closed to indicate that no more elements are coming.
53On the receiver side it is convenient to use a regular `for` loop to receive elements
54from the channel.
55
56Conceptually, a [close][SendChannel.close] is like sending a special close token to the channel.
57The iteration stops as soon as this close token is received, so there is a guarantee
58that all previously sent elements before the close are received:
59
60```kotlin
61import kotlinx.coroutines.*
62import kotlinx.coroutines.channels.*
63
64fun main() = runBlocking {
65//sampleStart
66    val channel = Channel<Int>()
67    launch {
68        for (x in 1..5) channel.send(x * x)
69        channel.close() // we're done sending
70    }
71    // here we print received values using `for` loop (until the channel is closed)
72    for (y in channel) println(y)
73    println("Done!")
74//sampleEnd
75}
76```
77{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
78
79> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-02.kt).
80>
81{type="note"}
82
83<!--- TEST
841
854
869
8716
8825
89Done!
90-->
91
92## Building channel producers
93
94The pattern where a coroutine is producing a sequence of elements is quite common.
95This is a part of _producer-consumer_ pattern that is often found in concurrent code.
96You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary
97to common sense that results must be returned from functions.
98
99There is a convenient coroutine builder named [produce] that makes it easy to do it right on producer side,
100and an extension function [consumeEach], that replaces a `for` loop on the consumer side:
101
102```kotlin
103import kotlinx.coroutines.*
104import kotlinx.coroutines.channels.*
105
106fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
107    for (x in 1..5) send(x * x)
108}
109
110fun main() = runBlocking {
111//sampleStart
112    val squares = produceSquares()
113    squares.consumeEach { println(it) }
114    println("Done!")
115//sampleEnd
116}
117```
118{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
119
120> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-03.kt).
121>
122{type="note"}
123
124<!--- TEST
1251
1264
1279
12816
12925
130Done!
131-->
132
133## Pipelines
134
135A pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values:
136
137```kotlin
138fun CoroutineScope.produceNumbers() = produce<Int> {
139    var x = 1
140    while (true) send(x++) // infinite stream of integers starting from 1
141}
142```
143
144And another coroutine or coroutines are consuming that stream, doing some processing, and producing some other results.
145In the example below, the numbers are just squared:
146
147```kotlin
148fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
149    for (x in numbers) send(x * x)
150}
151```
152
153The main code starts and connects the whole pipeline:
154
155<!--- CLEAR -->
156
157```kotlin
158import kotlinx.coroutines.*
159import kotlinx.coroutines.channels.*
160
161fun main() = runBlocking {
162//sampleStart
163    val numbers = produceNumbers() // produces integers from 1 and on
164    val squares = square(numbers) // squares integers
165    repeat(5) {
166        println(squares.receive()) // print first five
167    }
168    println("Done!") // we are done
169    coroutineContext.cancelChildren() // cancel children coroutines
170//sampleEnd
171}
172
173fun CoroutineScope.produceNumbers() = produce<Int> {
174    var x = 1
175    while (true) send(x++) // infinite stream of integers starting from 1
176}
177
178fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
179    for (x in numbers) send(x * x)
180}
181```
182{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
183
184> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-04.kt).
185>
186{type="note"}
187
188<!--- TEST
1891
1904
1919
19216
19325
194Done!
195-->
196
197> All functions that create coroutines are defined as extensions on [CoroutineScope],
198> so that we can rely on [structured concurrency](composing-suspending-functions.md#structured-concurrency-with-async) to make
199> sure that we don't have lingering global coroutines in our application.
200>
201{type="note"}
202
203## Prime numbers with pipeline
204
205Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline
206of coroutines. We start with an infinite sequence of numbers.
207
208```kotlin
209fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
210    var x = start
211    while (true) send(x++) // infinite stream of integers from start
212}
213```
214
215The following pipeline stage filters an incoming stream of numbers, removing all the numbers
216that are divisible by the given prime number:
217
218```kotlin
219fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
220    for (x in numbers) if (x % prime != 0) send(x)
221}
222```
223
224Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel,
225and launching new pipeline stage for each prime number found:
226
227```Plain Text
228numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
229```
230
231The following example prints the first ten prime numbers,
232running the whole pipeline in the context of the main thread. Since all the coroutines are launched in
233the scope of the main [runBlocking] coroutine
234we don't have to keep an explicit list of all the coroutines we have started.
235We use [cancelChildren][kotlin.coroutines.CoroutineContext.cancelChildren]
236extension function to cancel all the children coroutines after we have printed
237the first ten prime numbers.
238
239<!--- CLEAR -->
240
241```kotlin
242import kotlinx.coroutines.*
243import kotlinx.coroutines.channels.*
244
245fun main() = runBlocking {
246//sampleStart
247    var cur = numbersFrom(2)
248    repeat(10) {
249        val prime = cur.receive()
250        println(prime)
251        cur = filter(cur, prime)
252    }
253    coroutineContext.cancelChildren() // cancel all children to let main finish
254//sampleEnd
255}
256
257fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
258    var x = start
259    while (true) send(x++) // infinite stream of integers from start
260}
261
262fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
263    for (x in numbers) if (x % prime != 0) send(x)
264}
265```
266{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
267
268> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-05.kt).
269>
270{type="note"}
271
272The output of this code is:
273
274```text
2752
2763
2775
2787
27911
28013
28117
28219
28323
28429
285```
286
287<!--- TEST -->
288
289Note that you can build the same pipeline using
290[`iterator`](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/iterator.html)
291coroutine builder from the standard library.
292Replace `produce` with `iterator`, `send` with `yield`, `receive` with `next`,
293`ReceiveChannel` with `Iterator`, and get rid of the coroutine scope. You will not need `runBlocking` either.
294However, the benefit of a pipeline that uses channels as shown above is that it can actually use
295multiple CPU cores if you run it in [Dispatchers.Default] context.
296
297Anyway, this is an extremely impractical way to find prime numbers. In practice, pipelines do involve some
298other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be
299built using `sequence`/`iterator`, because they do not allow arbitrary suspension, unlike
300`produce`, which is fully asynchronous.
301
302## Fan-out
303
304Multiple coroutines may receive from the same channel, distributing work between themselves.
305Let us start with a producer coroutine that is periodically producing integers
306(ten numbers per second):
307
308```kotlin
309fun CoroutineScope.produceNumbers() = produce<Int> {
310    var x = 1 // start from 1
311    while (true) {
312        send(x++) // produce next
313        delay(100) // wait 0.1s
314    }
315}
316```
317
318Then we can have several processor coroutines. In this example, they just print their id and
319received number:
320
321```kotlin
322fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
323    for (msg in channel) {
324        println("Processor #$id received $msg")
325    }
326}
327```
328
329Now let us launch five processors and let them work for almost a second. See what happens:
330
331<!--- CLEAR -->
332
333```kotlin
334import kotlinx.coroutines.*
335import kotlinx.coroutines.channels.*
336
337fun main() = runBlocking<Unit> {
338//sampleStart
339    val producer = produceNumbers()
340    repeat(5) { launchProcessor(it, producer) }
341    delay(950)
342    producer.cancel() // cancel producer coroutine and thus kill them all
343//sampleEnd
344}
345
346fun CoroutineScope.produceNumbers() = produce<Int> {
347    var x = 1 // start from 1
348    while (true) {
349        send(x++) // produce next
350        delay(100) // wait 0.1s
351    }
352}
353
354fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
355    for (msg in channel) {
356        println("Processor #$id received $msg")
357    }
358}
359```
360{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
361
362> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-06.kt).
363>
364{type="note"}
365
366The output will be similar to the the following one, albeit the processor ids that receive
367each specific integer may be different:
368
369```text
370Processor #2 received 1
371Processor #4 received 2
372Processor #0 received 3
373Processor #1 received 4
374Processor #3 received 5
375Processor #2 received 6
376Processor #4 received 7
377Processor #0 received 8
378Processor #1 received 9
379Processor #3 received 10
380```
381
382<!--- TEST lines.size == 10 && lines.withIndex().all { (i, line) -> line.startsWith("Processor #") && line.endsWith(" received ${i + 1}") } -->
383
384Note that cancelling a producer coroutine closes its channel, thus eventually terminating iteration
385over the channel that processor coroutines are doing.
386
387Also, pay attention to how we explicitly iterate over channel with `for` loop to perform fan-out in `launchProcessor` code.
388Unlike `consumeEach`, this `for` loop pattern is perfectly safe to use from multiple coroutines. If one of the processor
389coroutines fails, then others would still be processing the channel, while a processor that is written via `consumeEach`
390always consumes (cancels) the underlying channel on its normal or abnormal completion.
391
392## Fan-in
393
394Multiple coroutines may send to the same channel.
395For example, let us have a channel of strings, and a suspending function that
396repeatedly sends a specified string to this channel with a specified delay:
397
398```kotlin
399suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
400    while (true) {
401        delay(time)
402        channel.send(s)
403    }
404}
405```
406
407Now, let us see what happens if we launch a couple of coroutines sending strings
408(in this example we launch them in the context of the main thread as main coroutine's children):
409
410<!--- CLEAR -->
411
412```kotlin
413import kotlinx.coroutines.*
414import kotlinx.coroutines.channels.*
415
416fun main() = runBlocking {
417//sampleStart
418    val channel = Channel<String>()
419    launch { sendString(channel, "foo", 200L) }
420    launch { sendString(channel, "BAR!", 500L) }
421    repeat(6) { // receive first six
422        println(channel.receive())
423    }
424    coroutineContext.cancelChildren() // cancel all children to let main finish
425//sampleEnd
426}
427
428suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
429    while (true) {
430        delay(time)
431        channel.send(s)
432    }
433}
434```
435{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
436
437> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-07.kt).
438>
439{type="note"}
440
441The output is:
442
443```text
444foo
445foo
446BAR!
447foo
448foo
449BAR!
450```
451
452<!--- TEST -->
453
454## Buffered channels
455
456The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver
457meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked,
458if receive is invoked first, it is suspended until send is invoked.
459
460Both [Channel()] factory function and [produce] builder take an optional `capacity` parameter to
461specify _buffer size_. Buffer allows senders to send multiple elements before suspending,
462similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full.
463
464Take a look at the behavior of the following code:
465
466```kotlin
467import kotlinx.coroutines.*
468import kotlinx.coroutines.channels.*
469
470fun main() = runBlocking<Unit> {
471//sampleStart
472    val channel = Channel<Int>(4) // create buffered channel
473    val sender = launch { // launch sender coroutine
474        repeat(10) {
475            println("Sending $it") // print before sending each element
476            channel.send(it) // will suspend when buffer is full
477        }
478    }
479    // don't receive anything... just wait....
480    delay(1000)
481    sender.cancel() // cancel sender coroutine
482//sampleEnd
483}
484```
485{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
486
487> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-08.kt).
488>
489{type="note"}
490
491It prints "sending" _five_ times using a buffered channel with capacity of _four_:
492
493```text
494Sending 0
495Sending 1
496Sending 2
497Sending 3
498Sending 4
499```
500
501<!--- TEST -->
502
503The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.
504
505## Channels are fair
506
507Send and receive operations to channels are _fair_ with respect to the order of their invocation from
508multiple coroutines. They are served in first-in first-out order, e.g. the first coroutine to invoke `receive`
509gets the element. In the following example two coroutines "ping" and "pong" are
510receiving the "ball" object from the shared "table" channel.
511
512```kotlin
513import kotlinx.coroutines.*
514import kotlinx.coroutines.channels.*
515
516//sampleStart
517data class Ball(var hits: Int)
518
519fun main() = runBlocking {
520    val table = Channel<Ball>() // a shared table
521    launch { player("ping", table) }
522    launch { player("pong", table) }
523    table.send(Ball(0)) // serve the ball
524    delay(1000) // delay 1 second
525    coroutineContext.cancelChildren() // game over, cancel them
526}
527
528suspend fun player(name: String, table: Channel<Ball>) {
529    for (ball in table) { // receive the ball in a loop
530        ball.hits++
531        println("$name $ball")
532        delay(300) // wait a bit
533        table.send(ball) // send the ball back
534    }
535}
536//sampleEnd
537```
538{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
539
540> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-09.kt).
541>
542{type="note"}
543
544The "ping" coroutine is started first, so it is the first one to receive the ball. Even though "ping"
545coroutine immediately starts receiving the ball again after sending it back to the table, the ball gets
546received by the "pong" coroutine, because it was already waiting for it:
547
548```text
549ping Ball(hits=1)
550pong Ball(hits=2)
551ping Ball(hits=3)
552pong Ball(hits=4)
553```
554
555<!--- TEST -->
556
557Note that sometimes channels may produce executions that look unfair due to the nature of the executor
558that is being used. See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/111) for details.
559
560## Ticker channels
561
562Ticker channel is a special rendezvous channel that produces `Unit` every time given delay passes since last consumption from this channel.
563Though it may seem to be useless standalone, it is a useful building block to create complex time-based [produce]
564pipelines and operators that do windowing and other time-dependent processing.
565Ticker channel can be used in [select] to perform "on tick" action.
566
567To create such channel use a factory method [ticker].
568To indicate that no further elements are needed use [ReceiveChannel.cancel] method on it.
569
570Now let's see how it works in practice:
571
572```kotlin
573import kotlinx.coroutines.*
574import kotlinx.coroutines.channels.*
575
576//sampleStart
577fun main() = runBlocking<Unit> {
578    val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
579    var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
580    println("Initial element is available immediately: $nextElement") // no initial delay
581
582    nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements have 100ms delay
583    println("Next element is not ready in 50 ms: $nextElement")
584
585    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
586    println("Next element is ready in 100 ms: $nextElement")
587
588    // Emulate large consumption delays
589    println("Consumer pauses for 150ms")
590    delay(150)
591    // Next element is available immediately
592    nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
593    println("Next element is available immediately after large consumer delay: $nextElement")
594    // Note that the pause between `receive` calls is taken into account and next element arrives faster
595    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
596    println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
597
598    tickerChannel.cancel() // indicate that no more elements are needed
599}
600//sampleEnd
601```
602{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
603
604> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-10.kt).
605>
606{type="note"}
607
608It prints following lines:
609
610```text
611Initial element is available immediately: kotlin.Unit
612Next element is not ready in 50 ms: null
613Next element is ready in 100 ms: kotlin.Unit
614Consumer pauses for 150ms
615Next element is available immediately after large consumer delay: kotlin.Unit
616Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
617```
618
619<!--- TEST -->
620
621Note that [ticker] is aware of possible consumer pauses and, by default, adjusts next produced element
622delay if a pause occurs, trying to maintain a fixed rate of produced elements.
623
624Optionally, a `mode` parameter equal to [TickerMode.FIXED_DELAY] can be specified to maintain a fixed
625delay between elements.
626
627<!--- MODULE kotlinx-coroutines-core -->
628<!--- INDEX kotlinx.coroutines -->
629
630[CoroutineScope]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html
631[runBlocking]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
632[kotlin.coroutines.CoroutineContext.cancelChildren]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/cancel-children.html
633[Dispatchers.Default]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
634
635<!--- INDEX kotlinx.coroutines.channels -->
636
637[Channel]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html
638[SendChannel.send]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html
639[ReceiveChannel.receive]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html
640[SendChannel.close]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/close.html
641[produce]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html
642[consumeEach]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume-each.html
643[Channel()]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel.html
644[ticker]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/ticker.html
645[ReceiveChannel.cancel]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/cancel.html
646[TickerMode.FIXED_DELAY]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-ticker-mode/-f-i-x-e-d_-d-e-l-a-y/index.html
647
648<!--- INDEX kotlinx.coroutines.selects -->
649
650[select]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html
651
652<!--- END -->
653