• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1<!--- TEST_NAME SelectGuideTest -->
2
3**Table of contents**
4
5<!--- TOC -->
6
7* [Select Expression (experimental)](#select-expression-experimental)
8  * [Selecting from channels](#selecting-from-channels)
9  * [Selecting on close](#selecting-on-close)
10  * [Selecting to send](#selecting-to-send)
11  * [Selecting deferred values](#selecting-deferred-values)
12  * [Switch over a channel of deferred values](#switch-over-a-channel-of-deferred-values)
13
14<!--- END -->
15
16## Select Expression (experimental)
17
18Select expression makes it possible to await multiple suspending functions simultaneously and _select_
19the first one that becomes available.
20
21> Select expressions are an experimental feature of `kotlinx.coroutines`. Their API is expected to
22evolve in the upcoming updates of the `kotlinx.coroutines` library with potentially
23breaking changes.
24
25### Selecting from channels
26
27Let us have two producers of strings: `fizz` and `buzz`. The `fizz` produces "Fizz" string every 300 ms:
28
29<div class="sample" markdown="1" theme="idea" data-highlight-only>
30
31```kotlin
32fun CoroutineScope.fizz() = produce<String> {
33    while (true) { // sends "Fizz" every 300 ms
34        delay(300)
35        send("Fizz")
36    }
37}
38```
39
40</div>
41
42And the `buzz` produces "Buzz!" string every 500 ms:
43
44<div class="sample" markdown="1" theme="idea" data-highlight-only>
45
46```kotlin
47fun CoroutineScope.buzz() = produce<String> {
48    while (true) { // sends "Buzz!" every 500 ms
49        delay(500)
50        send("Buzz!")
51    }
52}
53```
54
55</div>
56
57Using [receive][ReceiveChannel.receive] suspending function we can receive _either_ from one channel or the
58other. But [select] expression allows us to receive from _both_ simultaneously using its
59[onReceive][ReceiveChannel.onReceive] clauses:
60
61<div class="sample" markdown="1" theme="idea" data-highlight-only>
62
63```kotlin
64suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
65    select<Unit> { // <Unit> means that this select expression does not produce any result
66        fizz.onReceive { value ->  // this is the first select clause
67            println("fizz -> '$value'")
68        }
69        buzz.onReceive { value ->  // this is the second select clause
70            println("buzz -> '$value'")
71        }
72    }
73}
74```
75
76</div>
77
78Let us run it all seven times:
79
80<!--- CLEAR -->
81
82<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
83
84```kotlin
85import kotlinx.coroutines.*
86import kotlinx.coroutines.channels.*
87import kotlinx.coroutines.selects.*
88
89fun CoroutineScope.fizz() = produce<String> {
90    while (true) { // sends "Fizz" every 300 ms
91        delay(300)
92        send("Fizz")
93    }
94}
95
96fun CoroutineScope.buzz() = produce<String> {
97    while (true) { // sends "Buzz!" every 500 ms
98        delay(500)
99        send("Buzz!")
100    }
101}
102
103suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
104    select<Unit> { // <Unit> means that this select expression does not produce any result
105        fizz.onReceive { value ->  // this is the first select clause
106            println("fizz -> '$value'")
107        }
108        buzz.onReceive { value ->  // this is the second select clause
109            println("buzz -> '$value'")
110        }
111    }
112}
113
114fun main() = runBlocking<Unit> {
115//sampleStart
116    val fizz = fizz()
117    val buzz = buzz()
118    repeat(7) {
119        selectFizzBuzz(fizz, buzz)
120    }
121    coroutineContext.cancelChildren() // cancel fizz & buzz coroutines
122//sampleEnd
123}
124```
125
126</div>
127
128> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-select-01.kt).
129
130The result of this code is:
131
132```text
133fizz -> 'Fizz'
134buzz -> 'Buzz!'
135fizz -> 'Fizz'
136fizz -> 'Fizz'
137buzz -> 'Buzz!'
138fizz -> 'Fizz'
139buzz -> 'Buzz!'
140```
141
142<!--- TEST -->
143
144### Selecting on close
145
146The [onReceive][ReceiveChannel.onReceive] clause in `select` fails when the channel is closed causing the corresponding
147`select` to throw an exception. We can use [onReceiveOrNull][onReceiveOrNull] clause to perform a
148specific action when the channel is closed. The following example also shows that `select` is an expression that returns
149the result of its selected clause:
150
151<div class="sample" markdown="1" theme="idea" data-highlight-only>
152
153```kotlin
154suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
155    select<String> {
156        a.onReceiveOrNull { value ->
157            if (value == null)
158                "Channel 'a' is closed"
159            else
160                "a -> '$value'"
161        }
162        b.onReceiveOrNull { value ->
163            if (value == null)
164                "Channel 'b' is closed"
165            else
166                "b -> '$value'"
167        }
168    }
169```
170
171</div>
172
173Note that [onReceiveOrNull][onReceiveOrNull] is an extension function defined only
174for channels with non-nullable elements so that there is no accidental confusion between a closed channel
175and a null value.
176
177Let's use it with channel `a` that produces "Hello" string four times and
178channel `b` that produces "World" four times:
179
180<!--- CLEAR -->
181
182<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
183
184```kotlin
185import kotlinx.coroutines.*
186import kotlinx.coroutines.channels.*
187import kotlinx.coroutines.selects.*
188
189suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
190    select<String> {
191        a.onReceiveOrNull { value ->
192            if (value == null)
193                "Channel 'a' is closed"
194            else
195                "a -> '$value'"
196        }
197        b.onReceiveOrNull { value ->
198            if (value == null)
199                "Channel 'b' is closed"
200            else
201                "b -> '$value'"
202        }
203    }
204
205fun main() = runBlocking<Unit> {
206//sampleStart
207    val a = produce<String> {
208        repeat(4) { send("Hello $it") }
209    }
210    val b = produce<String> {
211        repeat(4) { send("World $it") }
212    }
213    repeat(8) { // print first eight results
214        println(selectAorB(a, b))
215    }
216    coroutineContext.cancelChildren()
217//sampleEnd
218}
219```
220
221</div>
222
223> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt).
224
225The result of this code is quite interesting, so we'll analyze it in mode detail:
226
227```text
228a -> 'Hello 0'
229a -> 'Hello 1'
230b -> 'World 0'
231a -> 'Hello 2'
232a -> 'Hello 3'
233b -> 'World 1'
234Channel 'a' is closed
235Channel 'a' is closed
236```
237
238<!--- TEST -->
239
240There are couple of observations to make out of it.
241
242First of all, `select` is _biased_ to the first clause. When several clauses are selectable at the same time,
243the first one among them gets selected. Here, both channels are constantly producing strings, so `a` channel,
244being the first clause in select, wins. However, because we are using unbuffered channel, the `a` gets suspended from
245time to time on its [send][SendChannel.send] invocation and gives a chance for `b` to send, too.
246
247The second observation, is that [onReceiveOrNull][onReceiveOrNull] gets immediately selected when the
248channel is already closed.
249
250### Selecting to send
251
252Select expression has [onSend][SendChannel.onSend] clause that can be used for a great good in combination
253with a biased nature of selection.
254
255Let us write an example of producer of integers that sends its values to a `side` channel when
256the consumers on its primary channel cannot keep up with it:
257
258<div class="sample" markdown="1" theme="idea" data-highlight-only>
259
260```kotlin
261fun CoroutineScope.produceNumbers(side: SendChannel<Int>) = produce<Int> {
262    for (num in 1..10) { // produce 10 numbers from 1 to 10
263        delay(100) // every 100 ms
264        select<Unit> {
265            onSend(num) {} // Send to the primary channel
266            side.onSend(num) {} // or to the side channel
267        }
268    }
269}
270```
271
272</div>
273
274Consumer is going to be quite slow, taking 250 ms to process each number:
275
276<!--- CLEAR -->
277
278<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
279
280```kotlin
281import kotlinx.coroutines.*
282import kotlinx.coroutines.channels.*
283import kotlinx.coroutines.selects.*
284
285fun CoroutineScope.produceNumbers(side: SendChannel<Int>) = produce<Int> {
286    for (num in 1..10) { // produce 10 numbers from 1 to 10
287        delay(100) // every 100 ms
288        select<Unit> {
289            onSend(num) {} // Send to the primary channel
290            side.onSend(num) {} // or to the side channel
291        }
292    }
293}
294
295fun main() = runBlocking<Unit> {
296//sampleStart
297    val side = Channel<Int>() // allocate side channel
298    launch { // this is a very fast consumer for the side channel
299        side.consumeEach { println("Side channel has $it") }
300    }
301    produceNumbers(side).consumeEach {
302        println("Consuming $it")
303        delay(250) // let us digest the consumed number properly, do not hurry
304    }
305    println("Done consuming")
306    coroutineContext.cancelChildren()
307//sampleEnd
308}
309```
310
311</div>
312
313> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-select-03.kt).
314
315So let us see what happens:
316
317```text
318Consuming 1
319Side channel has 2
320Side channel has 3
321Consuming 4
322Side channel has 5
323Side channel has 6
324Consuming 7
325Side channel has 8
326Side channel has 9
327Consuming 10
328Done consuming
329```
330
331<!--- TEST -->
332
333### Selecting deferred values
334
335Deferred values can be selected using [onAwait][Deferred.onAwait] clause.
336Let us start with an async function that returns a deferred string value after
337a random delay:
338
339<div class="sample" markdown="1" theme="idea" data-highlight-only>
340
341```kotlin
342fun CoroutineScope.asyncString(time: Int) = async {
343    delay(time.toLong())
344    "Waited for $time ms"
345}
346```
347
348</div>
349
350Let us start a dozen of them with a random delay.
351
352<div class="sample" markdown="1" theme="idea" data-highlight-only>
353
354```kotlin
355fun CoroutineScope.asyncStringsList(): List<Deferred<String>> {
356    val random = Random(3)
357    return List(12) { asyncString(random.nextInt(1000)) }
358}
359```
360
361</div>
362
363Now the main function awaits for the first of them to complete and counts the number of deferred values
364that are still active. Note that we've used here the fact that `select` expression is a Kotlin DSL,
365so we can provide clauses for it using an arbitrary code. In this case we iterate over a list
366of deferred values to provide `onAwait` clause for each deferred value.
367
368<!--- CLEAR -->
369
370<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
371
372```kotlin
373import kotlinx.coroutines.*
374import kotlinx.coroutines.selects.*
375import java.util.*
376
377fun CoroutineScope.asyncString(time: Int) = async {
378    delay(time.toLong())
379    "Waited for $time ms"
380}
381
382fun CoroutineScope.asyncStringsList(): List<Deferred<String>> {
383    val random = Random(3)
384    return List(12) { asyncString(random.nextInt(1000)) }
385}
386
387fun main() = runBlocking<Unit> {
388//sampleStart
389    val list = asyncStringsList()
390    val result = select<String> {
391        list.withIndex().forEach { (index, deferred) ->
392            deferred.onAwait { answer ->
393                "Deferred $index produced answer '$answer'"
394            }
395        }
396    }
397    println(result)
398    val countActive = list.count { it.isActive }
399    println("$countActive coroutines are still active")
400//sampleEnd
401}
402```
403
404</div>
405
406> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-select-04.kt).
407
408The output is:
409
410```text
411Deferred 4 produced answer 'Waited for 128 ms'
41211 coroutines are still active
413```
414
415<!--- TEST -->
416
417### Switch over a channel of deferred values
418
419Let us write a channel producer function that consumes a channel of deferred string values, waits for each received
420deferred value, but only until the next deferred value comes over or the channel is closed. This example puts together
421[onReceiveOrNull][onReceiveOrNull] and [onAwait][Deferred.onAwait] clauses in the same `select`:
422
423<div class="sample" markdown="1" theme="idea" data-highlight-only>
424
425```kotlin
426fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> {
427    var current = input.receive() // start with first received deferred value
428    while (isActive) { // loop while not cancelled/closed
429        val next = select<Deferred<String>?> { // return next deferred value from this select or null
430            input.onReceiveOrNull { update ->
431                update // replaces next value to wait
432            }
433            current.onAwait { value ->
434                send(value) // send value that current deferred has produced
435                input.receiveOrNull() // and use the next deferred from the input channel
436            }
437        }
438        if (next == null) {
439            println("Channel was closed")
440            break // out of loop
441        } else {
442            current = next
443        }
444    }
445}
446```
447
448</div>
449
450To test it, we'll use a simple async function that resolves to a specified string after a specified time:
451
452
453<div class="sample" markdown="1" theme="idea" data-highlight-only>
454
455```kotlin
456fun CoroutineScope.asyncString(str: String, time: Long) = async {
457    delay(time)
458    str
459}
460```
461
462</div>
463
464The main function just launches a coroutine to print results of `switchMapDeferreds` and sends some test
465data to it:
466
467<!--- CLEAR -->
468
469<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
470
471```kotlin
472import kotlinx.coroutines.*
473import kotlinx.coroutines.channels.*
474import kotlinx.coroutines.selects.*
475
476fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> {
477    var current = input.receive() // start with first received deferred value
478    while (isActive) { // loop while not cancelled/closed
479        val next = select<Deferred<String>?> { // return next deferred value from this select or null
480            input.onReceiveOrNull { update ->
481                update // replaces next value to wait
482            }
483            current.onAwait { value ->
484                send(value) // send value that current deferred has produced
485                input.receiveOrNull() // and use the next deferred from the input channel
486            }
487        }
488        if (next == null) {
489            println("Channel was closed")
490            break // out of loop
491        } else {
492            current = next
493        }
494    }
495}
496
497fun CoroutineScope.asyncString(str: String, time: Long) = async {
498    delay(time)
499    str
500}
501
502fun main() = runBlocking<Unit> {
503//sampleStart
504    val chan = Channel<Deferred<String>>() // the channel for test
505    launch { // launch printing coroutine
506        for (s in switchMapDeferreds(chan))
507            println(s) // print each received string
508    }
509    chan.send(asyncString("BEGIN", 100))
510    delay(200) // enough time for "BEGIN" to be produced
511    chan.send(asyncString("Slow", 500))
512    delay(100) // not enough time to produce slow
513    chan.send(asyncString("Replace", 100))
514    delay(500) // give it time before the last one
515    chan.send(asyncString("END", 500))
516    delay(1000) // give it time to process
517    chan.close() // close the channel ...
518    delay(500) // and wait some time to let it finish
519//sampleEnd
520}
521```
522
523</div>
524
525> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt).
526
527The result of this code:
528
529```text
530BEGIN
531Replace
532END
533Channel was closed
534```
535
536<!--- TEST -->
537
538<!--- MODULE kotlinx-coroutines-core -->
539<!--- INDEX kotlinx.coroutines -->
540[Deferred.onAwait]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-deferred/on-await.html
541<!--- INDEX kotlinx.coroutines.channels -->
542[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html
543[ReceiveChannel.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive.html
544[onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/on-receive-or-null.html
545[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html
546[SendChannel.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/on-send.html
547<!--- INDEX kotlinx.coroutines.selects -->
548[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html
549<!--- END -->
550