1<!--- TEST_NAME SelectGuideTest --> 2 3**Table of contents** 4 5<!--- TOC --> 6 7* [Select Expression (experimental)](#select-expression-experimental) 8 * [Selecting from channels](#selecting-from-channels) 9 * [Selecting on close](#selecting-on-close) 10 * [Selecting to send](#selecting-to-send) 11 * [Selecting deferred values](#selecting-deferred-values) 12 * [Switch over a channel of deferred values](#switch-over-a-channel-of-deferred-values) 13 14<!--- END --> 15 16## Select Expression (experimental) 17 18Select expression makes it possible to await multiple suspending functions simultaneously and _select_ 19the first one that becomes available. 20 21> Select expressions are an experimental feature of `kotlinx.coroutines`. Their API is expected to 22evolve in the upcoming updates of the `kotlinx.coroutines` library with potentially 23breaking changes. 24 25### Selecting from channels 26 27Let us have two producers of strings: `fizz` and `buzz`. The `fizz` produces "Fizz" string every 300 ms: 28 29<div class="sample" markdown="1" theme="idea" data-highlight-only> 30 31```kotlin 32fun CoroutineScope.fizz() = produce<String> { 33 while (true) { // sends "Fizz" every 300 ms 34 delay(300) 35 send("Fizz") 36 } 37} 38``` 39 40</div> 41 42And the `buzz` produces "Buzz!" string every 500 ms: 43 44<div class="sample" markdown="1" theme="idea" data-highlight-only> 45 46```kotlin 47fun CoroutineScope.buzz() = produce<String> { 48 while (true) { // sends "Buzz!" every 500 ms 49 delay(500) 50 send("Buzz!") 51 } 52} 53``` 54 55</div> 56 57Using [receive][ReceiveChannel.receive] suspending function we can receive _either_ from one channel or the 58other. But [select] expression allows us to receive from _both_ simultaneously using its 59[onReceive][ReceiveChannel.onReceive] clauses: 60 61<div class="sample" markdown="1" theme="idea" data-highlight-only> 62 63```kotlin 64suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) { 65 select<Unit> { // <Unit> means that this select expression does not produce any result 66 fizz.onReceive { value -> // this is the first select clause 67 println("fizz -> '$value'") 68 } 69 buzz.onReceive { value -> // this is the second select clause 70 println("buzz -> '$value'") 71 } 72 } 73} 74``` 75 76</div> 77 78Let us run it all seven times: 79 80<!--- CLEAR --> 81 82<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 83 84```kotlin 85import kotlinx.coroutines.* 86import kotlinx.coroutines.channels.* 87import kotlinx.coroutines.selects.* 88 89fun CoroutineScope.fizz() = produce<String> { 90 while (true) { // sends "Fizz" every 300 ms 91 delay(300) 92 send("Fizz") 93 } 94} 95 96fun CoroutineScope.buzz() = produce<String> { 97 while (true) { // sends "Buzz!" every 500 ms 98 delay(500) 99 send("Buzz!") 100 } 101} 102 103suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) { 104 select<Unit> { // <Unit> means that this select expression does not produce any result 105 fizz.onReceive { value -> // this is the first select clause 106 println("fizz -> '$value'") 107 } 108 buzz.onReceive { value -> // this is the second select clause 109 println("buzz -> '$value'") 110 } 111 } 112} 113 114fun main() = runBlocking<Unit> { 115//sampleStart 116 val fizz = fizz() 117 val buzz = buzz() 118 repeat(7) { 119 selectFizzBuzz(fizz, buzz) 120 } 121 coroutineContext.cancelChildren() // cancel fizz & buzz coroutines 122//sampleEnd 123} 124``` 125 126</div> 127 128> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-select-01.kt). 129 130The result of this code is: 131 132```text 133fizz -> 'Fizz' 134buzz -> 'Buzz!' 135fizz -> 'Fizz' 136fizz -> 'Fizz' 137buzz -> 'Buzz!' 138fizz -> 'Fizz' 139buzz -> 'Buzz!' 140``` 141 142<!--- TEST --> 143 144### Selecting on close 145 146The [onReceive][ReceiveChannel.onReceive] clause in `select` fails when the channel is closed causing the corresponding 147`select` to throw an exception. We can use [onReceiveOrNull][onReceiveOrNull] clause to perform a 148specific action when the channel is closed. The following example also shows that `select` is an expression that returns 149the result of its selected clause: 150 151<div class="sample" markdown="1" theme="idea" data-highlight-only> 152 153```kotlin 154suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String = 155 select<String> { 156 a.onReceiveOrNull { value -> 157 if (value == null) 158 "Channel 'a' is closed" 159 else 160 "a -> '$value'" 161 } 162 b.onReceiveOrNull { value -> 163 if (value == null) 164 "Channel 'b' is closed" 165 else 166 "b -> '$value'" 167 } 168 } 169``` 170 171</div> 172 173Note that [onReceiveOrNull][onReceiveOrNull] is an extension function defined only 174for channels with non-nullable elements so that there is no accidental confusion between a closed channel 175and a null value. 176 177Let's use it with channel `a` that produces "Hello" string four times and 178channel `b` that produces "World" four times: 179 180<!--- CLEAR --> 181 182<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 183 184```kotlin 185import kotlinx.coroutines.* 186import kotlinx.coroutines.channels.* 187import kotlinx.coroutines.selects.* 188 189suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String = 190 select<String> { 191 a.onReceiveOrNull { value -> 192 if (value == null) 193 "Channel 'a' is closed" 194 else 195 "a -> '$value'" 196 } 197 b.onReceiveOrNull { value -> 198 if (value == null) 199 "Channel 'b' is closed" 200 else 201 "b -> '$value'" 202 } 203 } 204 205fun main() = runBlocking<Unit> { 206//sampleStart 207 val a = produce<String> { 208 repeat(4) { send("Hello $it") } 209 } 210 val b = produce<String> { 211 repeat(4) { send("World $it") } 212 } 213 repeat(8) { // print first eight results 214 println(selectAorB(a, b)) 215 } 216 coroutineContext.cancelChildren() 217//sampleEnd 218} 219``` 220 221</div> 222 223> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt). 224 225The result of this code is quite interesting, so we'll analyze it in mode detail: 226 227```text 228a -> 'Hello 0' 229a -> 'Hello 1' 230b -> 'World 0' 231a -> 'Hello 2' 232a -> 'Hello 3' 233b -> 'World 1' 234Channel 'a' is closed 235Channel 'a' is closed 236``` 237 238<!--- TEST --> 239 240There are couple of observations to make out of it. 241 242First of all, `select` is _biased_ to the first clause. When several clauses are selectable at the same time, 243the first one among them gets selected. Here, both channels are constantly producing strings, so `a` channel, 244being the first clause in select, wins. However, because we are using unbuffered channel, the `a` gets suspended from 245time to time on its [send][SendChannel.send] invocation and gives a chance for `b` to send, too. 246 247The second observation, is that [onReceiveOrNull][onReceiveOrNull] gets immediately selected when the 248channel is already closed. 249 250### Selecting to send 251 252Select expression has [onSend][SendChannel.onSend] clause that can be used for a great good in combination 253with a biased nature of selection. 254 255Let us write an example of producer of integers that sends its values to a `side` channel when 256the consumers on its primary channel cannot keep up with it: 257 258<div class="sample" markdown="1" theme="idea" data-highlight-only> 259 260```kotlin 261fun CoroutineScope.produceNumbers(side: SendChannel<Int>) = produce<Int> { 262 for (num in 1..10) { // produce 10 numbers from 1 to 10 263 delay(100) // every 100 ms 264 select<Unit> { 265 onSend(num) {} // Send to the primary channel 266 side.onSend(num) {} // or to the side channel 267 } 268 } 269} 270``` 271 272</div> 273 274Consumer is going to be quite slow, taking 250 ms to process each number: 275 276<!--- CLEAR --> 277 278<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 279 280```kotlin 281import kotlinx.coroutines.* 282import kotlinx.coroutines.channels.* 283import kotlinx.coroutines.selects.* 284 285fun CoroutineScope.produceNumbers(side: SendChannel<Int>) = produce<Int> { 286 for (num in 1..10) { // produce 10 numbers from 1 to 10 287 delay(100) // every 100 ms 288 select<Unit> { 289 onSend(num) {} // Send to the primary channel 290 side.onSend(num) {} // or to the side channel 291 } 292 } 293} 294 295fun main() = runBlocking<Unit> { 296//sampleStart 297 val side = Channel<Int>() // allocate side channel 298 launch { // this is a very fast consumer for the side channel 299 side.consumeEach { println("Side channel has $it") } 300 } 301 produceNumbers(side).consumeEach { 302 println("Consuming $it") 303 delay(250) // let us digest the consumed number properly, do not hurry 304 } 305 println("Done consuming") 306 coroutineContext.cancelChildren() 307//sampleEnd 308} 309``` 310 311</div> 312 313> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-select-03.kt). 314 315So let us see what happens: 316 317```text 318Consuming 1 319Side channel has 2 320Side channel has 3 321Consuming 4 322Side channel has 5 323Side channel has 6 324Consuming 7 325Side channel has 8 326Side channel has 9 327Consuming 10 328Done consuming 329``` 330 331<!--- TEST --> 332 333### Selecting deferred values 334 335Deferred values can be selected using [onAwait][Deferred.onAwait] clause. 336Let us start with an async function that returns a deferred string value after 337a random delay: 338 339<div class="sample" markdown="1" theme="idea" data-highlight-only> 340 341```kotlin 342fun CoroutineScope.asyncString(time: Int) = async { 343 delay(time.toLong()) 344 "Waited for $time ms" 345} 346``` 347 348</div> 349 350Let us start a dozen of them with a random delay. 351 352<div class="sample" markdown="1" theme="idea" data-highlight-only> 353 354```kotlin 355fun CoroutineScope.asyncStringsList(): List<Deferred<String>> { 356 val random = Random(3) 357 return List(12) { asyncString(random.nextInt(1000)) } 358} 359``` 360 361</div> 362 363Now the main function awaits for the first of them to complete and counts the number of deferred values 364that are still active. Note that we've used here the fact that `select` expression is a Kotlin DSL, 365so we can provide clauses for it using an arbitrary code. In this case we iterate over a list 366of deferred values to provide `onAwait` clause for each deferred value. 367 368<!--- CLEAR --> 369 370<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 371 372```kotlin 373import kotlinx.coroutines.* 374import kotlinx.coroutines.selects.* 375import java.util.* 376 377fun CoroutineScope.asyncString(time: Int) = async { 378 delay(time.toLong()) 379 "Waited for $time ms" 380} 381 382fun CoroutineScope.asyncStringsList(): List<Deferred<String>> { 383 val random = Random(3) 384 return List(12) { asyncString(random.nextInt(1000)) } 385} 386 387fun main() = runBlocking<Unit> { 388//sampleStart 389 val list = asyncStringsList() 390 val result = select<String> { 391 list.withIndex().forEach { (index, deferred) -> 392 deferred.onAwait { answer -> 393 "Deferred $index produced answer '$answer'" 394 } 395 } 396 } 397 println(result) 398 val countActive = list.count { it.isActive } 399 println("$countActive coroutines are still active") 400//sampleEnd 401} 402``` 403 404</div> 405 406> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-select-04.kt). 407 408The output is: 409 410```text 411Deferred 4 produced answer 'Waited for 128 ms' 41211 coroutines are still active 413``` 414 415<!--- TEST --> 416 417### Switch over a channel of deferred values 418 419Let us write a channel producer function that consumes a channel of deferred string values, waits for each received 420deferred value, but only until the next deferred value comes over or the channel is closed. This example puts together 421[onReceiveOrNull][onReceiveOrNull] and [onAwait][Deferred.onAwait] clauses in the same `select`: 422 423<div class="sample" markdown="1" theme="idea" data-highlight-only> 424 425```kotlin 426fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> { 427 var current = input.receive() // start with first received deferred value 428 while (isActive) { // loop while not cancelled/closed 429 val next = select<Deferred<String>?> { // return next deferred value from this select or null 430 input.onReceiveOrNull { update -> 431 update // replaces next value to wait 432 } 433 current.onAwait { value -> 434 send(value) // send value that current deferred has produced 435 input.receiveOrNull() // and use the next deferred from the input channel 436 } 437 } 438 if (next == null) { 439 println("Channel was closed") 440 break // out of loop 441 } else { 442 current = next 443 } 444 } 445} 446``` 447 448</div> 449 450To test it, we'll use a simple async function that resolves to a specified string after a specified time: 451 452 453<div class="sample" markdown="1" theme="idea" data-highlight-only> 454 455```kotlin 456fun CoroutineScope.asyncString(str: String, time: Long) = async { 457 delay(time) 458 str 459} 460``` 461 462</div> 463 464The main function just launches a coroutine to print results of `switchMapDeferreds` and sends some test 465data to it: 466 467<!--- CLEAR --> 468 469<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 470 471```kotlin 472import kotlinx.coroutines.* 473import kotlinx.coroutines.channels.* 474import kotlinx.coroutines.selects.* 475 476fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> { 477 var current = input.receive() // start with first received deferred value 478 while (isActive) { // loop while not cancelled/closed 479 val next = select<Deferred<String>?> { // return next deferred value from this select or null 480 input.onReceiveOrNull { update -> 481 update // replaces next value to wait 482 } 483 current.onAwait { value -> 484 send(value) // send value that current deferred has produced 485 input.receiveOrNull() // and use the next deferred from the input channel 486 } 487 } 488 if (next == null) { 489 println("Channel was closed") 490 break // out of loop 491 } else { 492 current = next 493 } 494 } 495} 496 497fun CoroutineScope.asyncString(str: String, time: Long) = async { 498 delay(time) 499 str 500} 501 502fun main() = runBlocking<Unit> { 503//sampleStart 504 val chan = Channel<Deferred<String>>() // the channel for test 505 launch { // launch printing coroutine 506 for (s in switchMapDeferreds(chan)) 507 println(s) // print each received string 508 } 509 chan.send(asyncString("BEGIN", 100)) 510 delay(200) // enough time for "BEGIN" to be produced 511 chan.send(asyncString("Slow", 500)) 512 delay(100) // not enough time to produce slow 513 chan.send(asyncString("Replace", 100)) 514 delay(500) // give it time before the last one 515 chan.send(asyncString("END", 500)) 516 delay(1000) // give it time to process 517 chan.close() // close the channel ... 518 delay(500) // and wait some time to let it finish 519//sampleEnd 520} 521``` 522 523</div> 524 525> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt). 526 527The result of this code: 528 529```text 530BEGIN 531Replace 532END 533Channel was closed 534``` 535 536<!--- TEST --> 537 538<!--- MODULE kotlinx-coroutines-core --> 539<!--- INDEX kotlinx.coroutines --> 540[Deferred.onAwait]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-deferred/on-await.html 541<!--- INDEX kotlinx.coroutines.channels --> 542[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html 543[ReceiveChannel.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive.html 544[onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/on-receive-or-null.html 545[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html 546[SendChannel.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/on-send.html 547<!--- INDEX kotlinx.coroutines.selects --> 548[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html 549<!--- END --> 550