1<!--- INCLUDE .*/example-reactive-([a-z]+)-([0-9]+)\.kt 2/* 3 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 4 */ 5 6// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit. 7package kotlinx.coroutines.rx2.guide.$$1$$2 8 9--> 10<!--- KNIT kotlinx-coroutines-rx2/test/guide/.*\.kt --> 11<!--- TEST_OUT kotlinx-coroutines-rx2/test/guide/test/GuideReactiveTest.kt 12// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit. 13package kotlinx.coroutines.rx2.guide.test 14 15import kotlinx.coroutines.guide.test.* 16import org.junit.Test 17 18class GuideReactiveTest : ReactiveTestBase() { 19--> 20 21# Guide to reactive streams with coroutines 22 23This guide explains the key differences between Kotlin coroutines and reactive streams and shows 24how they can be used together for the greater good. Prior familiarity with the basic coroutine concepts 25that are covered in [Guide to kotlinx.coroutines](../docs/coroutines-guide.md) is not required, 26but is a big plus. If you are familiar with reactive streams, you may find this guide 27a better introduction into the world of coroutines. 28 29There are several modules in `kotlinx.coroutines` project that are related to reactive streams: 30 31* [kotlinx-coroutines-reactive](kotlinx-coroutines-reactive) -- utilities for [Reactive Streams](https://www.reactive-streams.org) 32* [kotlinx-coroutines-reactor](kotlinx-coroutines-reactor) -- utilities for [Reactor](https://projectreactor.io) 33* [kotlinx-coroutines-rx2](kotlinx-coroutines-rx2) -- utilities for [RxJava 2.x](https://github.com/ReactiveX/RxJava) 34 35This guide is mostly based on [Reactive Streams](https://www.reactive-streams.org) specification and uses 36its `Publisher` interface with some examples based on [RxJava 2.x](https://github.com/ReactiveX/RxJava), 37which implements reactive streams specification. 38 39You are welcome to clone 40[`kotlinx.coroutines` project](https://github.com/Kotlin/kotlinx.coroutines) 41from GitHub to your workstation in order to 42run all the presented examples. They are contained in 43[reactive/kotlinx-coroutines-rx2/test/guide](kotlinx-coroutines-rx2/test/guide) 44directory of the project. 45 46## Table of contents 47 48<!--- TOC --> 49 50* [Differences between reactive streams and channels](#differences-between-reactive-streams-and-channels) 51 * [Basics of iteration](#basics-of-iteration) 52 * [Subscription and cancellation](#subscription-and-cancellation) 53 * [Backpressure](#backpressure) 54 * [Rx Subject vs BroadcastChannel](#rx-subject-vs-broadcastchannel) 55* [Operators](#operators) 56 * [Range](#range) 57 * [Fused filter-map hybrid](#fused-filter-map-hybrid) 58 * [Take until](#take-until) 59 * [Merge](#merge) 60* [Coroutine context](#coroutine-context) 61 * [Threads with Rx](#threads-with-rx) 62 * [Threads with coroutines](#threads-with-coroutines) 63 * [Rx observeOn](#rx-observeon) 64 * [Coroutine context to rule them all](#coroutine-context-to-rule-them-all) 65 * [Unconfined context](#unconfined-context) 66 67<!--- END_TOC --> 68 69## Differences between reactive streams and channels 70 71This section outlines key differences between reactive streams and coroutine-based channels. 72 73### Basics of iteration 74 75The [Channel] is somewhat similar concept to the following reactive stream classes: 76 77* Reactive stream [Publisher](https://github.com/reactive-streams/reactive-streams-jvm/blob/master/api/src/main/java/org/reactivestreams/Publisher.java); 78* Rx Java 1.x [Observable](https://reactivex.io/RxJava/javadoc/rx/Observable.html); 79* Rx Java 2.x [Flowable](https://reactivex.io/RxJava/2.x/javadoc/), which implements `Publisher`. 80 81They all describe an asynchronous stream of elements (aka items in Rx), either infinite or finite, 82and all of them support backpressure. 83 84However, the `Channel` always represents a _hot_ stream of items, using Rx terminology. Elements are being sent 85into the channel by producer coroutines and are received from it by consumer coroutines. 86Every [receive][ReceiveChannel.receive] invocation consumes an element from the channel. 87Let us illustrate it with the following example: 88 89<!--- INCLUDE 90import kotlinx.coroutines.* 91import kotlinx.coroutines.channels.* 92import kotlin.coroutines.* 93--> 94 95```kotlin 96fun main() = runBlocking<Unit> { 97 // create a channel that produces numbers from 1 to 3 with 200ms delays between them 98 val source = produce<Int> { 99 println("Begin") // mark the beginning of this coroutine in output 100 for (x in 1..3) { 101 delay(200) // wait for 200ms 102 send(x) // send number x to the channel 103 } 104 } 105 // print elements from the source 106 println("Elements:") 107 source.consumeEach { // consume elements from it 108 println(it) 109 } 110 // print elements from the source AGAIN 111 println("Again:") 112 source.consumeEach { // consume elements from it 113 println(it) 114 } 115} 116``` 117 118> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-01.kt). 119 120This code produces the following output: 121 122```text 123Elements: 124Begin 1251 1262 1273 128Again: 129``` 130 131<!--- TEST --> 132 133Notice how the "Begin" line was printed just once, because the [produce] _coroutine builder_, when it is executed, 134launches one coroutine to produce a stream of elements. All the produced elements are consumed 135with [ReceiveChannel.consumeEach][consumeEach] 136extension function. There is no way to receive the elements from this 137channel again. The channel is closed when the producer coroutine is over and an attempt to receive 138from it again cannot receive anything. 139 140Let us rewrite this code using the [publish] coroutine builder from `kotlinx-coroutines-reactive` module 141instead of [produce] from `kotlinx-coroutines-core` module. The code stays the same, 142but where `source` used to have the [ReceiveChannel] type, it now has the reactive streams' 143[Publisher](https://www.reactive-streams.org/reactive-streams-1.0.0-javadoc/org/reactivestreams/Publisher.html) 144type, and where [consumeEach] was used to _consume_ elements from the channel, 145now [collect][org.reactivestreams.Publisher.collect] is used to _collect_ elements from the publisher. 146 147<!--- INCLUDE 148import kotlinx.coroutines.* 149import kotlinx.coroutines.reactive.* 150import kotlin.coroutines.* 151--> 152 153```kotlin 154fun main() = runBlocking<Unit> { 155 // create a publisher that produces numbers from 1 to 3 with 200ms delays between them 156 val source = publish<Int> { 157 // ^^^^^^^ <--- Difference from the previous examples is here 158 println("Begin") // mark the beginning of this coroutine in output 159 for (x in 1..3) { 160 delay(200) // wait for 200ms 161 send(x) // send number x to the channel 162 } 163 } 164 // print elements from the source 165 println("Elements:") 166 source.collect { // collect elements from it 167 println(it) 168 } 169 // print elements from the source AGAIN 170 println("Again:") 171 source.collect { // collect elements from it 172 println(it) 173 } 174} 175``` 176 177> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-02.kt). 178 179Now the output of this code changes to: 180 181```text 182Elements: 183Begin 1841 1852 1863 187Again: 188Begin 1891 1902 1913 192``` 193 194<!--- TEST --> 195 196This example highlights the key difference between a reactive stream and a channel. A reactive stream is a higher-order 197functional concept. While the channel _is_ a stream of elements, the reactive stream defines a recipe on how the stream of 198elements is produced. It becomes the actual stream of elements when _collected_. Each collector may receive the same or 199a different stream of elements, depending on how the corresponding implementation of `Publisher` works. 200 201The [publish] coroutine builder used in the above example does not launch a coroutine, 202but every [collect][org.reactivestreams.Publisher.collect] invocation does. 203There are two of them here and that is why we see "Begin" printed twice. 204 205In Rx lingo, this kind of publisher is called _cold_. Many standard Rx operators produce cold streams, too. We can collect 206them from a coroutine, and every collector gets the same stream of elements. 207 208> Note that we can replicate the same behaviour that we saw with channels by using Rx 209[publish](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#publish()) 210operator and [connect](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/flowables/ConnectableFlowable.html#connect()) 211method with it. 212 213### Subscription and cancellation 214 215In the second example from the previous section, `source.collect { ... }` was used to collect all elements. 216Instead, we can open a channel using [openSubscription][org.reactivestreams.Publisher.openSubscription] 217and iterate over it. In this way, we can have finer-grained control over our iteration 218(using `break`, for example), as shown below: 219 220<!--- INCLUDE 221import io.reactivex.* 222import kotlinx.coroutines.* 223import kotlinx.coroutines.channels.* 224import kotlinx.coroutines.reactive.* 225--> 226 227```kotlin 228fun main() = runBlocking<Unit> { 229 val source = Flowable.range(1, 5) // a range of five numbers 230 .doOnSubscribe { println("OnSubscribe") } // provide some insight 231 .doOnComplete { println("OnComplete") } // ... 232 .doFinally { println("Finally") } // ... into what's going on 233 var cnt = 0 234 source.openSubscription().consume { // open channel to the source 235 for (x in this) { // iterate over the channel to receive elements from it 236 println(x) 237 if (++cnt >= 3) break // break when 3 elements are printed 238 } 239 // Note: `consume` cancels the channel when this block of code is complete 240 } 241} 242``` 243 244> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-03.kt). 245 246It produces the following output: 247 248```text 249OnSubscribe 2501 2512 2523 253Finally 254``` 255 256<!--- TEST --> 257 258With an explicit `openSubscription` we should [cancel][ReceiveChannel.cancel] the corresponding 259subscription to unsubscribe from the source, but there is no need to call `cancel` explicitly -- 260[consume] does that for us under the hood. 261The installed 262[doFinally](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#doFinally(io.reactivex.functions.Action)) 263listener prints "Finally" to confirm that the subscription is actually being closed. Note that "OnComplete" 264is never printed because we did not consume all of the elements. 265 266We do not need to use an explicit `cancel` either if we `collect` all the elements: 267 268<!--- INCLUDE 269import io.reactivex.* 270import kotlinx.coroutines.* 271import kotlinx.coroutines.reactive.* 272import kotlin.coroutines.* 273--> 274 275```kotlin 276fun main() = runBlocking<Unit> { 277 val source = Flowable.range(1, 5) // a range of five numbers 278 .doOnSubscribe { println("OnSubscribe") } // provide some insight 279 .doOnComplete { println("OnComplete") } // ... 280 .doFinally { println("Finally") } // ... into what's going on 281 // collect the source fully 282 source.collect { println(it) } 283} 284``` 285 286> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-04.kt). 287 288We get the following output: 289 290```text 291OnSubscribe 2921 2932 2943 295OnComplete 296Finally 2974 2985 299``` 300 301<!--- TEST --> 302 303Notice how "OnComplete" and "Finally" are printed before the lasts elements "4" and "5". 304It happens because our `main` function in this 305example is a coroutine that we start with the [runBlocking] coroutine builder. 306Our main coroutine receives on the flowable using the `source.collect { ... }` expression. 307The main coroutine is _suspended_ while it waits for the source to emit an item. 308When the last items are emitted by `Flowable.range(1, 5)` it 309_resumes_ the main coroutine, which gets dispatched onto the main thread to print this 310 last element at a later point in time, while the source completes and prints "Finally". 311 312### Backpressure 313 314Backpressure is one of the most interesting and complex aspects of reactive streams. Coroutines can 315_suspend_ and they provide a natural answer to handling backpressure. 316 317In Rx Java 2.x, the backpressure-capable class is called 318[Flowable](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html). 319In the following example, we use [rxFlowable] coroutine builder from `kotlinx-coroutines-rx2` module to define a 320flowable that sends three integers from 1 to 3. 321It prints a message to the output before invocation of the 322suspending [send][SendChannel.send] function, so that we can study how it operates. 323 324The integers are generated in the context of the main thread, but the subscription is shifted 325to another thread using Rx 326[observeOn](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#observeOn(io.reactivex.Scheduler,%20boolean,%20int)) 327operator with a buffer of size 1. 328The subscriber is slow. It takes 500 ms to process each item, which is simulated using `Thread.sleep`. 329 330<!--- INCLUDE 331import io.reactivex.schedulers.* 332import kotlinx.coroutines.* 333import kotlinx.coroutines.rx2.* 334import kotlin.coroutines.* 335--> 336 337```kotlin 338fun main() = runBlocking<Unit> { 339 // coroutine -- fast producer of elements in the context of the main thread 340 val source = rxFlowable { 341 for (x in 1..3) { 342 send(x) // this is a suspending function 343 println("Sent $x") // print after successfully sent item 344 } 345 } 346 // subscribe on another thread with a slow subscriber using Rx 347 source 348 .observeOn(Schedulers.io(), false, 1) // specify buffer size of 1 item 349 .doOnComplete { println("Complete") } 350 .subscribe { x -> 351 Thread.sleep(500) // 500ms to process each item 352 println("Processed $x") 353 } 354 delay(2000) // suspend the main thread for a few seconds 355} 356``` 357 358> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-05.kt). 359 360The output of this code nicely illustrates how backpressure works with coroutines: 361 362```text 363Sent 1 364Processed 1 365Sent 2 366Processed 2 367Sent 3 368Processed 3 369Complete 370``` 371 372<!--- TEST --> 373 374We see here how the producer coroutine puts the first element in the buffer and is suspended while trying to send another 375one. Only after the consumer processes the first item, the producer sends the second one and resumes, etc. 376 377 378### Rx Subject vs BroadcastChannel 379 380RxJava has a concept of [Subject](https://github.com/ReactiveX/RxJava/wiki/Subject) which is an object that 381effectively broadcasts elements to all its subscribers. The matching concept in the coroutines world is called a 382[BroadcastChannel]. There is a variety of subjects in Rx with 383[BehaviorSubject](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/subjects/BehaviorSubject.html) being 384the one used to manage state: 385 386<!--- INCLUDE 387import io.reactivex.subjects.BehaviorSubject 388--> 389 390```kotlin 391fun main() { 392 val subject = BehaviorSubject.create<String>() 393 subject.onNext("one") 394 subject.onNext("two") // updates the state of BehaviorSubject, "one" value is lost 395 // now subscribe to this subject and print everything 396 subject.subscribe(System.out::println) 397 subject.onNext("three") 398 subject.onNext("four") 399} 400``` 401 402> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-06.kt). 403 404This code prints the current state of the subject on subscription and all its further updates: 405 406 407```text 408two 409three 410four 411``` 412 413<!--- TEST --> 414 415You can subscribe to subjects from a coroutine just as with any other reactive stream: 416 417<!--- INCLUDE 418import io.reactivex.subjects.BehaviorSubject 419import kotlinx.coroutines.* 420import kotlinx.coroutines.rx2.collect 421--> 422 423```kotlin 424fun main() = runBlocking<Unit> { 425 val subject = BehaviorSubject.create<String>() 426 subject.onNext("one") 427 subject.onNext("two") 428 // now launch a coroutine to print everything 429 GlobalScope.launch(Dispatchers.Unconfined) { // launch coroutine in unconfined context 430 subject.collect { println(it) } 431 } 432 subject.onNext("three") 433 subject.onNext("four") 434} 435``` 436 437> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-07.kt). 438 439The result is the same: 440 441```text 442two 443three 444four 445``` 446 447<!--- TEST --> 448 449Here we use the [Dispatchers.Unconfined] coroutine context to launch a consuming coroutine with the same behavior as subscription in Rx. 450It basically means that the launched coroutine is going to be immediately executed in the same thread that 451is emitting elements. Contexts are covered in more details in a [separate section](#coroutine-context). 452 453The advantage of coroutines is that it is easy to get conflation behavior for single-threaded UI updates. 454A typical UI application does not need to react to every state change. Only the most recent state is relevant. 455A sequence of back-to-back updates to the application state needs to get reflected in UI only once, 456as soon as the UI thread is free. For the following example we are going to simulate this by launching 457a consuming coroutine in the context of the main thread and use the [yield] function to simulate a break in the 458sequence of updates and to release the main thread: 459 460<!--- INCLUDE 461import io.reactivex.subjects.* 462import kotlinx.coroutines.* 463import kotlinx.coroutines.rx2.* 464import kotlin.coroutines.* 465--> 466 467```kotlin 468fun main() = runBlocking<Unit> { 469 val subject = BehaviorSubject.create<String>() 470 subject.onNext("one") 471 subject.onNext("two") 472 // now launch a coroutine to print the most recent update 473 launch { // use the context of the main thread for a coroutine 474 subject.collect { println(it) } 475 } 476 subject.onNext("three") 477 subject.onNext("four") 478 yield() // yield the main thread to the launched coroutine <--- HERE 479 subject.onComplete() // now complete the subject's sequence to cancel the consumer, too 480} 481``` 482 483> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-08.kt). 484 485Now the coroutine processes (prints) only the most recent update: 486 487```text 488four 489``` 490 491<!--- TEST --> 492 493The corresponding behavior in the pure coroutines world is implemented by [ConflatedBroadcastChannel] 494that provides the same logic on top of coroutine channels directly, 495without going through the bridge to the reactive streams: 496 497<!--- INCLUDE 498import kotlinx.coroutines.channels.* 499import kotlinx.coroutines.* 500import kotlin.coroutines.* 501--> 502 503```kotlin 504fun main() = runBlocking<Unit> { 505 val broadcast = ConflatedBroadcastChannel<String>() 506 broadcast.offer("one") 507 broadcast.offer("two") 508 // now launch a coroutine to print the most recent update 509 launch { // use the context of the main thread for a coroutine 510 broadcast.consumeEach { println(it) } 511 } 512 broadcast.offer("three") 513 broadcast.offer("four") 514 yield() // yield the main thread to the launched coroutine 515 broadcast.close() // now close the broadcast channel to cancel the consumer, too 516} 517``` 518 519> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-09.kt). 520 521It produces the same output as the previous example based on `BehaviorSubject`: 522 523```text 524four 525``` 526 527<!--- TEST --> 528 529Another implementation of [BroadcastChannel] is `ArrayBroadcastChannel` with an array-based buffer of 530a specified `capacity`. It can be created with `BroadcastChannel(capacity)`. 531It delivers every event to every 532subscriber as soon as their corresponding subscriptions are opened. It corresponds to 533[PublishSubject](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/subjects/PublishSubject.html) in Rx. 534The capacity of the buffer in the constructor of `ArrayBroadcastChannel` controls the numbers of elements 535that can be sent before the sender is suspended waiting for a receiver to receive those elements. 536 537## Operators 538 539Full-featured reactive stream libraries, like Rx, come with 540[a very large set of operators](https://reactivex.io/documentation/operators.html) to create, transform, combine 541and otherwise process the corresponding streams. Creating your own operators with support for 542back-pressure is [notoriously](https://akarnokd.blogspot.ru/2015/05/pitfalls-of-operator-implementations.html) 543[difficult](https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0). 544 545Coroutines and channels are designed to provide an opposite experience. There are no built-in operators, 546but processing streams of elements is extremely simple and back-pressure is supported automatically 547without you having to explicitly think about it. 548 549This section shows a coroutine-based implementation of several reactive stream operators. 550 551### Range 552 553Let's roll out own implementation of 554[range](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#range(int,%20int)) 555operator for the reactive streams' `Publisher` interface. The asynchronous clean-slate implementation of this operator for 556reactive streams is explained in 557[this blog post](https://akarnokd.blogspot.ru/2017/03/java-9-flow-api-asynchronous-integer.html). 558It takes a lot of code. 559Here is the corresponding code with coroutines: 560 561<!--- INCLUDE 562import kotlinx.coroutines.* 563import kotlinx.coroutines.reactive.* 564import kotlin.coroutines.CoroutineContext 565--> 566 567```kotlin 568fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = publish<Int>(context) { 569 for (x in start until start + count) send(x) 570} 571``` 572 573Here, `CoroutineScope` and `context` are used instead of an `Executor` and all the backpressure aspects are taken care 574of by the coroutines machinery. Note that this implementation depends only on the small reactive streams library 575that defines the `Publisher` interface and its friends. 576 577Using it from a coroutine is straightforward: 578 579```kotlin 580fun main() = runBlocking<Unit> { 581 // Range inherits parent job from runBlocking, but overrides dispatcher with Dispatchers.Default 582 range(Dispatchers.Default, 1, 5).collect { println(it) } 583} 584``` 585 586> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-operators-01.kt). 587 588The result of this code is quite expected: 589 590```text 5911 5922 5933 5944 5955 596``` 597 598<!--- TEST --> 599 600### Fused filter-map hybrid 601 602Reactive operators like 603[filter](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#filter(io.reactivex.functions.Predicate)) and 604[map](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#map(io.reactivex.functions.Function)) 605are trivial to implement with coroutines. For a bit of challenge and showcase, let us combine them 606into the single `fusedFilterMap` operator: 607 608<!--- INCLUDE 609import kotlinx.coroutines.* 610import kotlinx.coroutines.reactive.* 611import org.reactivestreams.* 612import kotlin.coroutines.* 613--> 614 615```kotlin 616fun <T, R> Publisher<T>.fusedFilterMap( 617 context: CoroutineContext, // the context to execute this coroutine in 618 predicate: (T) -> Boolean, // the filter predicate 619 mapper: (T) -> R // the mapper function 620) = publish<R>(context) { 621 collect { // collect the source stream 622 if (predicate(it)) // filter part 623 send(mapper(it)) // map part 624 } 625} 626``` 627 628Using `range` from the previous example we can test our `fusedFilterMap` 629by filtering for even numbers and mapping them to strings: 630 631<!--- INCLUDE 632 633fun CoroutineScope.range(start: Int, count: Int) = publish<Int> { 634 for (x in start until start + count) send(x) 635} 636--> 637 638```kotlin 639fun main() = runBlocking<Unit> { 640 range(1, 5) 641 .fusedFilterMap(Dispatchers.Unconfined, { it % 2 == 0}, { "$it is even" }) 642 .collect { println(it) } // print all the resulting strings 643} 644``` 645 646> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-operators-02.kt). 647 648It is not hard to see that the result is going to be: 649 650```text 6512 is even 6524 is even 653``` 654 655<!--- TEST --> 656 657### Take until 658 659Let's implement our own version of 660[takeUntil](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#takeUntil(org.reactivestreams.Publisher)) 661operator. It is quite [tricky](https://akarnokd.blogspot.ru/2015/05/pitfalls-of-operator-implementations.html) 662as subscriptions to two streams need to be tracked and managed. 663We need to relay all the elements from the source stream until the other stream either completes or 664emits anything. However, we have the [select] expression to rescue us in the coroutines implementation: 665 666<!--- INCLUDE 667import kotlinx.coroutines.channels.* 668import kotlinx.coroutines.* 669import kotlinx.coroutines.reactive.* 670import kotlinx.coroutines.selects.* 671import org.reactivestreams.* 672import kotlin.coroutines.* 673--> 674 675```kotlin 676fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = publish<T>(context) { 677 this@takeUntil.openSubscription().consume { // explicitly open channel to Publisher<T> 678 val current = this 679 other.openSubscription().consume { // explicitly open channel to Publisher<U> 680 val other = this 681 whileSelect { 682 other.onReceive { false } // bail out on any received element from `other` 683 current.onReceive { send(it); true } // resend element from this channel and continue 684 } 685 } 686 } 687} 688``` 689 690This code is using [whileSelect] as a nicer shortcut to `while(select{...}) {}` loop and Kotlin's 691[consume] expressions to close the channels on exit, which unsubscribes from the corresponding publishers. 692 693The following hand-written combination of 694[range](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#range(int,%20int)) with 695[interval](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#interval(long,%20java.util.concurrent.TimeUnit,%20io.reactivex.Scheduler)) 696is used for testing. It is coded using a `publish` coroutine builder 697(its pure-Rx implementation is shown in later sections): 698 699```kotlin 700fun CoroutineScope.rangeWithInterval(time: Long, start: Int, count: Int) = publish<Int> { 701 for (x in start until start + count) { 702 delay(time) // wait before sending each number 703 send(x) 704 } 705} 706``` 707 708The following code shows how `takeUntil` works: 709 710```kotlin 711fun main() = runBlocking<Unit> { 712 val slowNums = rangeWithInterval(200, 1, 10) // numbers with 200ms interval 713 val stop = rangeWithInterval(500, 1, 10) // the first one after 500ms 714 slowNums.takeUntil(Dispatchers.Unconfined, stop).collect { println(it) } // let's test it 715} 716``` 717 718> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-operators-03.kt). 719 720Producing 721 722```text 7231 7242 725``` 726 727<!--- TEST --> 728 729### Merge 730 731There are always at least two ways for processing multiple streams of data with coroutines. One way involving 732[select] was shown in the previous example. The other way is just to launch multiple coroutines. Let 733us implement 734[merge](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#merge(org.reactivestreams.Publisher)) 735operator using the latter approach: 736 737<!--- INCLUDE 738import kotlinx.coroutines.* 739import kotlinx.coroutines.reactive.* 740import org.reactivestreams.* 741import kotlin.coroutines.* 742--> 743 744```kotlin 745fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = publish<T>(context) { 746 collect { pub -> // for each publisher collected 747 launch { // launch a child coroutine 748 pub.collect { send(it) } // resend all element from this publisher 749 } 750 } 751} 752``` 753 754Notice that all the coroutines that are 755being launched here are the children of the `publish` 756coroutine and will get cancelled when the `publish` coroutine is cancelled or is otherwise completed. 757Moreover, since the parent coroutine waits until all the children are complete, this implementation fully 758merges all the received streams. 759 760For a test, let us start with the `rangeWithInterval` function from the previous example and write a 761producer that sends its results twice with some delay: 762 763<!--- INCLUDE 764 765fun CoroutineScope.rangeWithInterval(time: Long, start: Int, count: Int) = publish<Int> { 766 for (x in start until start + count) { 767 delay(time) // wait before sending each number 768 send(x) 769 } 770} 771--> 772 773```kotlin 774fun CoroutineScope.testPub() = publish<Publisher<Int>> { 775 send(rangeWithInterval(250, 1, 4)) // number 1 at 250ms, 2 at 500ms, 3 at 750ms, 4 at 1000ms 776 delay(100) // wait for 100 ms 777 send(rangeWithInterval(500, 11, 3)) // number 11 at 600ms, 12 at 1100ms, 13 at 1600ms 778 delay(1100) // wait for 1.1s - done in 1.2 sec after start 779} 780``` 781 782The test code is to use `merge` on `testPub` and to display the results: 783 784```kotlin 785fun main() = runBlocking<Unit> { 786 testPub().merge(Dispatchers.Unconfined).collect { println(it) } // print the whole stream 787} 788``` 789 790> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-operators-04.kt). 791 792And the results should be: 793 794```text 7951 7962 79711 7983 7994 80012 80113 802``` 803 804<!--- TEST --> 805 806## Coroutine context 807 808All the example operators that are shown in the previous section have an explicit 809[CoroutineContext](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines/-coroutine-context/) 810parameter. In the Rx world it roughly corresponds to 811a [Scheduler](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Scheduler.html). 812 813### Threads with Rx 814 815The following example shows the basics of threading context management with Rx. 816Here `rangeWithIntervalRx` is an implementation of `rangeWithInterval` function using Rx 817`zip`, `range`, and `interval` operators. 818 819<!--- INCLUDE 820import io.reactivex.* 821import io.reactivex.functions.BiFunction 822import io.reactivex.schedulers.Schedulers 823import java.util.concurrent.TimeUnit 824--> 825 826```kotlin 827fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> = 828 Flowable.zip( 829 Flowable.range(start, count), 830 Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler), 831 BiFunction { x, _ -> x }) 832 833fun main() { 834 rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3) 835 .subscribe { println("$it on thread ${Thread.currentThread().name}") } 836 Thread.sleep(1000) 837} 838``` 839 840> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-context-01.kt). 841 842We are explicitly passing the 843[Schedulers.computation()](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/schedulers/Schedulers.html#computation()) 844scheduler to our `rangeWithIntervalRx` operator and 845it is going to be executed in Rx computation thread pool. The output is going to be similar to the following one: 846 847```text 8481 on thread RxComputationThreadPool-1 8492 on thread RxComputationThreadPool-1 8503 on thread RxComputationThreadPool-1 851``` 852 853<!--- TEST FLEXIBLE_THREAD --> 854 855### Threads with coroutines 856 857In the world of coroutines `Schedulers.computation()` roughly corresponds to [Dispatchers.Default], 858so the previous example is similar to the following one: 859 860<!--- INCLUDE 861import io.reactivex.* 862import kotlinx.coroutines.* 863import kotlinx.coroutines.reactive.* 864import kotlin.coroutines.CoroutineContext 865--> 866 867```kotlin 868fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) { 869 for (x in start until start + count) { 870 delay(time) // wait before sending each number 871 send(x) 872 } 873} 874 875fun main() { 876 Flowable.fromPublisher(rangeWithInterval(Dispatchers.Default, 100, 1, 3)) 877 .subscribe { println("$it on thread ${Thread.currentThread().name}") } 878 Thread.sleep(1000) 879} 880``` 881 882> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-context-02.kt). 883 884The produced output is going to be similar to: 885 886```text 8871 on thread ForkJoinPool.commonPool-worker-1 8882 on thread ForkJoinPool.commonPool-worker-1 8893 on thread ForkJoinPool.commonPool-worker-1 890``` 891 892<!--- TEST LINES_START --> 893 894Here we've used Rx 895[subscribe](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#subscribe(io.reactivex.functions.Consumer)) 896operator that does not have its own scheduler and operates on the same thread that the publisher -- on a default 897shared pool of threads in this example. 898 899### Rx observeOn 900 901In Rx you use special operators to modify the threading context for operations in the chain. You 902can find some [good guides](https://tomstechnicalblog.blogspot.ru/2016/02/rxjava-understanding-observeon-and.html) 903about them, if you are not familiar. 904 905For example, there is 906[observeOn](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#observeOn(io.reactivex.Scheduler)) 907operator. Let us modify the previous example to observe using `Schedulers.computation()`: 908 909<!--- INCLUDE 910import io.reactivex.* 911import kotlinx.coroutines.* 912import kotlinx.coroutines.reactive.* 913import io.reactivex.schedulers.Schedulers 914import kotlin.coroutines.CoroutineContext 915--> 916 917```kotlin 918fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) { 919 for (x in start until start + count) { 920 delay(time) // wait before sending each number 921 send(x) 922 } 923} 924 925fun main() { 926 Flowable.fromPublisher(rangeWithInterval(Dispatchers.Default, 100, 1, 3)) 927 .observeOn(Schedulers.computation()) // <-- THIS LINE IS ADDED 928 .subscribe { println("$it on thread ${Thread.currentThread().name}") } 929 Thread.sleep(1000) 930} 931``` 932 933> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-context-03.kt). 934 935Here is the difference in output, notice "RxComputationThreadPool": 936 937```text 9381 on thread RxComputationThreadPool-1 9392 on thread RxComputationThreadPool-1 9403 on thread RxComputationThreadPool-1 941``` 942 943<!--- TEST FLEXIBLE_THREAD --> 944 945### Coroutine context to rule them all 946 947A coroutine is always working in some context. For example, let us start a coroutine 948in the main thread with [runBlocking] and iterate over the result of the Rx version of `rangeWithIntervalRx` operator, 949instead of using Rx `subscribe` operator: 950 951<!--- INCLUDE 952import io.reactivex.* 953import kotlinx.coroutines.* 954import kotlinx.coroutines.reactive.* 955import io.reactivex.functions.BiFunction 956import io.reactivex.schedulers.Schedulers 957import java.util.concurrent.TimeUnit 958--> 959 960```kotlin 961fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> = 962 Flowable.zip( 963 Flowable.range(start, count), 964 Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler), 965 BiFunction { x, _ -> x }) 966 967fun main() = runBlocking<Unit> { 968 rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3) 969 .collect { println("$it on thread ${Thread.currentThread().name}") } 970} 971``` 972 973> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-context-04.kt). 974 975The resulting messages are going to be printed in the main thread: 976 977```text 9781 on thread main 9792 on thread main 9803 on thread main 981``` 982 983<!--- TEST LINES_START --> 984 985### Unconfined context 986 987Most Rx operators do not have any specific thread (scheduler) associated with them and are working 988in whatever thread they happen to be invoked. We've seen it in the example with the `subscribe` operator 989in the [threads with Rx](#threads-with-rx) section. 990 991In the world of coroutines, [Dispatchers.Unconfined] context serves a similar role. Let us modify our previous example, 992but instead of iterating over the source `Flowable` from the `runBlocking` coroutine that is confined 993to the main thread, we launch a new coroutine in the `Dispatchers.Unconfined` context, while the main coroutine 994simply waits for its completion using [Job.join]: 995 996<!--- INCLUDE 997import io.reactivex.* 998import kotlinx.coroutines.* 999import kotlinx.coroutines.reactive.* 1000import io.reactivex.functions.BiFunction 1001import io.reactivex.schedulers.Schedulers 1002import java.util.concurrent.TimeUnit 1003--> 1004 1005```kotlin 1006fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> = 1007 Flowable.zip( 1008 Flowable.range(start, count), 1009 Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler), 1010 BiFunction { x, _ -> x }) 1011 1012fun main() = runBlocking<Unit> { 1013 val job = launch(Dispatchers.Unconfined) { // launch a new coroutine in Unconfined context (without its own thread pool) 1014 rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3) 1015 .collect { println("$it on thread ${Thread.currentThread().name}") } 1016 } 1017 job.join() // wait for our coroutine to complete 1018} 1019``` 1020 1021> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-context-05.kt). 1022 1023Now, the output shows that the code of the coroutine is executing in the Rx computation thread pool, just 1024like our initial example using the Rx `subscribe` operator. 1025 1026```text 10271 on thread RxComputationThreadPool-1 10282 on thread RxComputationThreadPool-1 10293 on thread RxComputationThreadPool-1 1030``` 1031 1032<!--- TEST LINES_START --> 1033 1034Note that the [Dispatchers.Unconfined] context should be used with care. It may improve the overall performance on certain tests, 1035due to the increased stack-locality of operations and less scheduling overhead, but it also produces deeper stacks 1036and makes it harder to reason about asynchronicity of the code that is using it. 1037 1038If a coroutine sends an element to a channel, then the thread that invoked the 1039[send][SendChannel.send] may start executing the code of the coroutine with the [Dispatchers.Unconfined] dispatcher. 1040The original producer coroutine that invoked `send` is paused until the unconfined consumer coroutine hits its next 1041suspension point. This is very similar to a lock-step single-threaded `onNext` execution in the Rx world in the absense 1042of thread-shifting operators. It is a normal default for Rx, because operators are usually doing very small chunks 1043of work and you have to combine many operators for a complex processing. However, this is unusual with coroutines, 1044where you can have an arbitrary complex processing in a coroutine. Usually, you only need to chain stream-processing 1045coroutines for complex pipelines with fan-in and fan-out between multiple worker coroutines. 1046 1047<!--- MODULE kotlinx-coroutines-core --> 1048<!--- INDEX kotlinx.coroutines --> 1049[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html 1050[Dispatchers.Unconfined]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-unconfined.html 1051[yield]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/yield.html 1052[Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html 1053[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/join.html 1054<!--- INDEX kotlinx.coroutines.channels --> 1055[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html 1056[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html 1057[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html 1058[consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume-each.html 1059[ReceiveChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/index.html 1060[ReceiveChannel.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/cancel.html 1061[consume]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume.html 1062[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html 1063[BroadcastChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-broadcast-channel/index.html 1064[ConflatedBroadcastChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-conflated-broadcast-channel/index.html 1065<!--- INDEX kotlinx.coroutines.selects --> 1066[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html 1067[whileSelect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/while-select.html 1068<!--- MODULE kotlinx-coroutines-reactive --> 1069<!--- INDEX kotlinx.coroutines.reactive --> 1070[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/publish.html 1071[org.reactivestreams.Publisher.collect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/collect.html 1072[org.reactivestreams.Publisher.openSubscription]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/open-subscription.html 1073<!--- MODULE kotlinx-coroutines-rx2 --> 1074<!--- INDEX kotlinx.coroutines.rx2 --> 1075[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-flowable.html 1076<!--- END --> 1077 1078 1079