• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1<!--- INCLUDE .*/example-reactive-([a-z]+)-([0-9]+)\.kt
2/*
3 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
4 */
5
6// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
7package kotlinx.coroutines.rx2.guide.$$1$$2
8
9-->
10<!--- KNIT     kotlinx-coroutines-rx2/test/guide/.*\.kt -->
11<!--- TEST_OUT kotlinx-coroutines-rx2/test/guide/test/GuideReactiveTest.kt
12// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
13package kotlinx.coroutines.rx2.guide.test
14
15import kotlinx.coroutines.guide.test.*
16import org.junit.Test
17
18class GuideReactiveTest : ReactiveTestBase() {
19-->
20
21# Guide to reactive streams with coroutines
22
23This guide explains the key differences between Kotlin coroutines and reactive streams and shows
24how they can be used together for the greater good. Prior familiarity with the basic coroutine concepts
25that are covered in [Guide to kotlinx.coroutines](../docs/coroutines-guide.md) is not required,
26but is a big plus. If you are familiar with reactive streams, you may find this guide
27a better introduction into the world of coroutines.
28
29There are several modules in `kotlinx.coroutines` project that are related to reactive streams:
30
31* [kotlinx-coroutines-reactive](kotlinx-coroutines-reactive) -- utilities for [Reactive Streams](https://www.reactive-streams.org)
32* [kotlinx-coroutines-reactor](kotlinx-coroutines-reactor) -- utilities for [Reactor](https://projectreactor.io)
33* [kotlinx-coroutines-rx2](kotlinx-coroutines-rx2) -- utilities for [RxJava 2.x](https://github.com/ReactiveX/RxJava)
34
35This guide is mostly based on [Reactive Streams](https://www.reactive-streams.org) specification and uses
36its `Publisher` interface with some examples based on [RxJava 2.x](https://github.com/ReactiveX/RxJava),
37which implements reactive streams specification.
38
39You are welcome to clone
40[`kotlinx.coroutines` project](https://github.com/Kotlin/kotlinx.coroutines)
41from GitHub to your workstation in order to
42run all the presented examples. They are contained in
43[reactive/kotlinx-coroutines-rx2/test/guide](kotlinx-coroutines-rx2/test/guide)
44directory of the project.
45
46## Table of contents
47
48<!--- TOC -->
49
50* [Differences between reactive streams and channels](#differences-between-reactive-streams-and-channels)
51  * [Basics of iteration](#basics-of-iteration)
52  * [Subscription and cancellation](#subscription-and-cancellation)
53  * [Backpressure](#backpressure)
54  * [Rx Subject vs BroadcastChannel](#rx-subject-vs-broadcastchannel)
55* [Operators](#operators)
56  * [Range](#range)
57  * [Fused filter-map hybrid](#fused-filter-map-hybrid)
58  * [Take until](#take-until)
59  * [Merge](#merge)
60* [Coroutine context](#coroutine-context)
61  * [Threads with Rx](#threads-with-rx)
62  * [Threads with coroutines](#threads-with-coroutines)
63  * [Rx observeOn](#rx-observeon)
64  * [Coroutine context to rule them all](#coroutine-context-to-rule-them-all)
65  * [Unconfined context](#unconfined-context)
66
67<!--- END_TOC -->
68
69## Differences between reactive streams and channels
70
71This section outlines key differences between reactive streams and coroutine-based channels.
72
73### Basics of iteration
74
75The [Channel] is somewhat similar concept to the following reactive stream classes:
76
77* Reactive stream [Publisher](https://github.com/reactive-streams/reactive-streams-jvm/blob/master/api/src/main/java/org/reactivestreams/Publisher.java);
78* Rx Java 1.x [Observable](https://reactivex.io/RxJava/javadoc/rx/Observable.html);
79* Rx Java 2.x [Flowable](https://reactivex.io/RxJava/2.x/javadoc/), which implements `Publisher`.
80
81They all describe an asynchronous stream of elements (aka items in Rx), either infinite or finite,
82and all of them support backpressure.
83
84However, the `Channel` always represents a _hot_ stream of items, using Rx terminology. Elements are being sent
85into the channel by producer coroutines and are received from it by consumer coroutines.
86Every [receive][ReceiveChannel.receive] invocation consumes an element from the channel.
87Let us illustrate it with the following example:
88
89<!--- INCLUDE
90import kotlinx.coroutines.*
91import kotlinx.coroutines.channels.*
92import kotlin.coroutines.*
93-->
94
95```kotlin
96fun main() = runBlocking<Unit> {
97    // create a channel that produces numbers from 1 to 3 with 200ms delays between them
98    val source = produce<Int> {
99        println("Begin") // mark the beginning of this coroutine in output
100        for (x in 1..3) {
101            delay(200) // wait for 200ms
102            send(x) // send number x to the channel
103        }
104    }
105    // print elements from the source
106    println("Elements:")
107    source.consumeEach { // consume elements from it
108        println(it)
109    }
110    // print elements from the source AGAIN
111    println("Again:")
112    source.consumeEach { // consume elements from it
113        println(it)
114    }
115}
116```
117
118> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-01.kt).
119
120This code produces the following output:
121
122```text
123Elements:
124Begin
1251
1262
1273
128Again:
129```
130
131<!--- TEST -->
132
133Notice how the "Begin" line was printed just once, because the [produce] _coroutine builder_, when it is executed,
134launches one coroutine to produce a stream of elements. All the produced elements are consumed
135with [ReceiveChannel.consumeEach][consumeEach]
136extension function. There is no way to receive the elements from this
137channel again. The channel is closed when the producer coroutine is over and an attempt to receive
138from it again cannot receive anything.
139
140Let us rewrite this code using the [publish] coroutine builder from `kotlinx-coroutines-reactive` module
141instead of [produce] from `kotlinx-coroutines-core` module. The code stays the same,
142but where `source` used to have the [ReceiveChannel] type, it now has the reactive streams'
143[Publisher](https://www.reactive-streams.org/reactive-streams-1.0.0-javadoc/org/reactivestreams/Publisher.html)
144type, and where [consumeEach] was used to _consume_ elements from the channel,
145now [collect][org.reactivestreams.Publisher.collect] is used to _collect_ elements from the publisher.
146
147<!--- INCLUDE
148import kotlinx.coroutines.*
149import kotlinx.coroutines.reactive.*
150import kotlin.coroutines.*
151-->
152
153```kotlin
154fun main() = runBlocking<Unit> {
155    // create a publisher that produces numbers from 1 to 3 with 200ms delays between them
156    val source = publish<Int> {
157    //           ^^^^^^^  <---  Difference from the previous examples is here
158        println("Begin") // mark the beginning of this coroutine in output
159        for (x in 1..3) {
160            delay(200) // wait for 200ms
161            send(x) // send number x to the channel
162        }
163    }
164    // print elements from the source
165    println("Elements:")
166    source.collect { // collect elements from it
167        println(it)
168    }
169    // print elements from the source AGAIN
170    println("Again:")
171    source.collect { // collect elements from it
172        println(it)
173    }
174}
175```
176
177> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-02.kt).
178
179Now the output of this code changes to:
180
181```text
182Elements:
183Begin
1841
1852
1863
187Again:
188Begin
1891
1902
1913
192```
193
194<!--- TEST -->
195
196This example highlights the key difference between a reactive stream and a channel. A reactive stream is a higher-order
197functional concept. While the channel _is_ a stream of elements, the reactive stream defines a recipe on how the stream of
198elements is produced. It becomes the actual stream of elements when _collected_. Each collector may receive the same or
199a different stream of elements, depending on how the corresponding implementation of `Publisher` works.
200
201The [publish] coroutine builder used in the above example does not launch a coroutine,
202but every [collect][org.reactivestreams.Publisher.collect] invocation does.
203There are two of them here and that is why we see "Begin" printed twice.
204
205In Rx lingo, this kind of publisher is called _cold_. Many standard Rx operators produce cold streams, too. We can collect
206them from a coroutine, and every collector gets the same stream of elements.
207
208> Note that we can replicate the same behaviour that we saw with channels by using Rx
209[publish](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#publish())
210operator and [connect](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/flowables/ConnectableFlowable.html#connect())
211method with it.
212
213### Subscription and cancellation
214
215In the second example from the previous section, `source.collect { ... }` was used to collect all elements.
216Instead, we can open a channel using [openSubscription][org.reactivestreams.Publisher.openSubscription]
217and iterate over it. In this way, we can have finer-grained control over our iteration
218(using `break`, for example), as shown below:
219
220<!--- INCLUDE
221import io.reactivex.*
222import kotlinx.coroutines.*
223import kotlinx.coroutines.channels.*
224import kotlinx.coroutines.reactive.*
225-->
226
227```kotlin
228fun main() = runBlocking<Unit> {
229    val source = Flowable.range(1, 5) // a range of five numbers
230        .doOnSubscribe { println("OnSubscribe") } // provide some insight
231        .doOnComplete { println("OnComplete") }   // ...
232        .doFinally { println("Finally") }         // ... into what's going on
233    var cnt = 0
234    source.openSubscription().consume { // open channel to the source
235        for (x in this) { // iterate over the channel to receive elements from it
236            println(x)
237            if (++cnt >= 3) break // break when 3 elements are printed
238        }
239        // Note: `consume` cancels the channel when this block of code is complete
240    }
241}
242```
243
244> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-03.kt).
245
246It produces the following output:
247
248```text
249OnSubscribe
2501
2512
2523
253Finally
254```
255
256<!--- TEST -->
257
258With an explicit `openSubscription` we should [cancel][ReceiveChannel.cancel] the corresponding
259subscription to unsubscribe from the source, but there is no need to call `cancel` explicitly --
260[consume] does that for us under the hood.
261The installed
262[doFinally](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#doFinally(io.reactivex.functions.Action))
263listener prints "Finally" to confirm that the subscription is actually being closed. Note that "OnComplete"
264is never printed because we did not consume all of the elements.
265
266We do not need to use an explicit `cancel` either if we `collect` all the elements:
267
268<!--- INCLUDE
269import io.reactivex.*
270import kotlinx.coroutines.*
271import kotlinx.coroutines.reactive.*
272import kotlin.coroutines.*
273-->
274
275```kotlin
276fun main() = runBlocking<Unit> {
277    val source = Flowable.range(1, 5) // a range of five numbers
278        .doOnSubscribe { println("OnSubscribe") } // provide some insight
279        .doOnComplete { println("OnComplete") }   // ...
280        .doFinally { println("Finally") }         // ... into what's going on
281    // collect the source fully
282    source.collect { println(it) }
283}
284```
285
286> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-04.kt).
287
288We get the following output:
289
290```text
291OnSubscribe
2921
2932
2943
295OnComplete
296Finally
2974
2985
299```
300
301<!--- TEST -->
302
303Notice how "OnComplete" and "Finally" are printed before the lasts elements "4" and "5".
304It happens because our `main` function in this
305example is a coroutine that we start with the [runBlocking] coroutine builder.
306Our main coroutine receives on the flowable using the `source.collect { ... }` expression.
307The main coroutine is _suspended_ while it waits for the source to emit an item.
308When the last items are emitted by `Flowable.range(1, 5)` it
309_resumes_ the main coroutine, which gets dispatched onto the main thread to print this
310 last element at a later point in time, while the source completes and prints "Finally".
311
312### Backpressure
313
314Backpressure is one of the most interesting and complex aspects of reactive streams. Coroutines can
315_suspend_ and they provide a natural answer to handling backpressure.
316
317In Rx Java 2.x, the backpressure-capable class is called
318[Flowable](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html).
319In the following example, we use [rxFlowable] coroutine builder from `kotlinx-coroutines-rx2` module to define a
320flowable that sends three integers from 1 to 3.
321It prints a message to the output before invocation of the
322suspending [send][SendChannel.send] function, so that we can study how it operates.
323
324The integers are generated in the context of the main thread, but the subscription is shifted
325to another thread using Rx
326[observeOn](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#observeOn(io.reactivex.Scheduler,%20boolean,%20int))
327operator with a buffer of size 1.
328The subscriber is slow. It takes 500 ms to process each item, which is simulated using `Thread.sleep`.
329
330<!--- INCLUDE
331import io.reactivex.schedulers.*
332import kotlinx.coroutines.*
333import kotlinx.coroutines.rx2.*
334import kotlin.coroutines.*
335-->
336
337```kotlin
338fun main() = runBlocking<Unit> {
339    // coroutine -- fast producer of elements in the context of the main thread
340    val source = rxFlowable {
341        for (x in 1..3) {
342            send(x) // this is a suspending function
343            println("Sent $x") // print after successfully sent item
344        }
345    }
346    // subscribe on another thread with a slow subscriber using Rx
347    source
348        .observeOn(Schedulers.io(), false, 1) // specify buffer size of 1 item
349        .doOnComplete { println("Complete") }
350        .subscribe { x ->
351            Thread.sleep(500) // 500ms to process each item
352            println("Processed $x")
353        }
354    delay(2000) // suspend the main thread for a few seconds
355}
356```
357
358> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-05.kt).
359
360The output of this code nicely illustrates how backpressure works with coroutines:
361
362```text
363Sent 1
364Processed 1
365Sent 2
366Processed 2
367Sent 3
368Processed 3
369Complete
370```
371
372<!--- TEST -->
373
374We see here how the producer coroutine puts the first element in the buffer and is suspended while trying to send another
375one. Only after the consumer processes the first item, the producer sends the second one and resumes, etc.
376
377
378### Rx Subject vs BroadcastChannel
379
380RxJava has a concept of [Subject](https://github.com/ReactiveX/RxJava/wiki/Subject) which is an object that
381effectively broadcasts elements to all its subscribers. The matching concept in the coroutines world is called a
382[BroadcastChannel]. There is a variety of subjects in Rx with
383[BehaviorSubject](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/subjects/BehaviorSubject.html) being
384the one used to manage state:
385
386<!--- INCLUDE
387import io.reactivex.subjects.BehaviorSubject
388-->
389
390```kotlin
391fun main() {
392    val subject = BehaviorSubject.create<String>()
393    subject.onNext("one")
394    subject.onNext("two") // updates the state of BehaviorSubject, "one" value is lost
395    // now subscribe to this subject and print everything
396    subject.subscribe(System.out::println)
397    subject.onNext("three")
398    subject.onNext("four")
399}
400```
401
402> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-06.kt).
403
404This code prints the current state of the subject on subscription and all its further updates:
405
406
407```text
408two
409three
410four
411```
412
413<!--- TEST -->
414
415You can subscribe to subjects from a coroutine just as with any other reactive stream:
416
417<!--- INCLUDE
418import io.reactivex.subjects.BehaviorSubject
419import kotlinx.coroutines.*
420import kotlinx.coroutines.rx2.collect
421-->
422
423```kotlin
424fun main() = runBlocking<Unit> {
425    val subject = BehaviorSubject.create<String>()
426    subject.onNext("one")
427    subject.onNext("two")
428    // now launch a coroutine to print everything
429    GlobalScope.launch(Dispatchers.Unconfined) { // launch coroutine in unconfined context
430        subject.collect { println(it) }
431    }
432    subject.onNext("three")
433    subject.onNext("four")
434}
435```
436
437> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-07.kt).
438
439The result is the same:
440
441```text
442two
443three
444four
445```
446
447<!--- TEST -->
448
449Here we use the [Dispatchers.Unconfined] coroutine context to launch a consuming coroutine with the same behavior as subscription in Rx.
450It basically means that the launched coroutine is going to be immediately executed in the same thread that
451is emitting elements. Contexts are covered in more details in a [separate section](#coroutine-context).
452
453The advantage of coroutines is that it is easy to get conflation behavior for single-threaded UI updates.
454A typical UI application does not need to react to every state change. Only the most recent state is relevant.
455A sequence of back-to-back updates to the application state needs to get reflected in UI only once,
456as soon as the UI thread is free. For the following example we are going to simulate this by launching
457a consuming coroutine in the context of the main thread and use the [yield] function to simulate a break in the
458sequence of updates and to release the main thread:
459
460<!--- INCLUDE
461import io.reactivex.subjects.*
462import kotlinx.coroutines.*
463import kotlinx.coroutines.rx2.*
464import kotlin.coroutines.*
465-->
466
467```kotlin
468fun main() = runBlocking<Unit> {
469    val subject = BehaviorSubject.create<String>()
470    subject.onNext("one")
471    subject.onNext("two")
472    // now launch a coroutine to print the most recent update
473    launch { // use the context of the main thread for a coroutine
474        subject.collect { println(it) }
475    }
476    subject.onNext("three")
477    subject.onNext("four")
478    yield() // yield the main thread to the launched coroutine <--- HERE
479    subject.onComplete() // now complete the subject's sequence to cancel the consumer, too
480}
481```
482
483> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-08.kt).
484
485Now the coroutine processes (prints) only the most recent update:
486
487```text
488four
489```
490
491<!--- TEST -->
492
493The corresponding behavior in the pure coroutines world is implemented by [ConflatedBroadcastChannel]
494that provides the same logic on top of coroutine channels directly,
495without going through the bridge to the reactive streams:
496
497<!--- INCLUDE
498import kotlinx.coroutines.channels.*
499import kotlinx.coroutines.*
500import kotlin.coroutines.*
501-->
502
503```kotlin
504fun main() = runBlocking<Unit> {
505    val broadcast = ConflatedBroadcastChannel<String>()
506    broadcast.offer("one")
507    broadcast.offer("two")
508    // now launch a coroutine to print the most recent update
509    launch { // use the context of the main thread for a coroutine
510        broadcast.consumeEach { println(it) }
511    }
512    broadcast.offer("three")
513    broadcast.offer("four")
514    yield() // yield the main thread to the launched coroutine
515    broadcast.close() // now close the broadcast channel to cancel the consumer, too
516}
517```
518
519> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-09.kt).
520
521It produces the same output as the previous example based on `BehaviorSubject`:
522
523```text
524four
525```
526
527<!--- TEST -->
528
529Another implementation of [BroadcastChannel] is `ArrayBroadcastChannel` with an array-based buffer of
530a specified `capacity`. It can be created with `BroadcastChannel(capacity)`.
531It delivers every event to every
532subscriber as soon as their corresponding subscriptions are opened. It corresponds to
533[PublishSubject](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/subjects/PublishSubject.html) in Rx.
534The capacity of the buffer in the constructor of `ArrayBroadcastChannel` controls the numbers of elements
535that can be sent before the sender is suspended waiting for a receiver to receive those elements.
536
537## Operators
538
539Full-featured reactive stream libraries, like Rx, come with
540[a very large set of operators](https://reactivex.io/documentation/operators.html) to create, transform, combine
541and otherwise process the corresponding streams. Creating your own operators with support for
542back-pressure is [notoriously](https://akarnokd.blogspot.ru/2015/05/pitfalls-of-operator-implementations.html)
543[difficult](https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0).
544
545Coroutines and channels are designed to provide an opposite experience. There are no built-in operators,
546but processing streams of elements is extremely simple and back-pressure is supported automatically
547without you having to explicitly think about it.
548
549This section shows a coroutine-based implementation of several reactive stream operators.
550
551### Range
552
553Let's roll out own implementation of
554[range](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#range(int,%20int))
555operator for the reactive streams' `Publisher` interface. The asynchronous clean-slate implementation of this operator for
556reactive streams is explained in
557[this blog post](https://akarnokd.blogspot.ru/2017/03/java-9-flow-api-asynchronous-integer.html).
558It takes a lot of code.
559Here is the corresponding code with coroutines:
560
561<!--- INCLUDE
562import kotlinx.coroutines.*
563import kotlinx.coroutines.reactive.*
564import kotlin.coroutines.CoroutineContext
565-->
566
567```kotlin
568fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = publish<Int>(context) {
569    for (x in start until start + count) send(x)
570}
571```
572
573Here, `CoroutineScope` and `context` are used instead of an `Executor` and all the backpressure aspects are taken care
574of by the coroutines machinery. Note that this implementation depends only on the small reactive streams library
575that defines the `Publisher` interface and its friends.
576
577Using it from a coroutine is straightforward:
578
579```kotlin
580fun main() = runBlocking<Unit> {
581    // Range inherits parent job from runBlocking, but overrides dispatcher with Dispatchers.Default
582    range(Dispatchers.Default, 1, 5).collect { println(it) }
583}
584```
585
586> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-operators-01.kt).
587
588The result of this code is quite expected:
589
590```text
5911
5922
5933
5944
5955
596```
597
598<!--- TEST -->
599
600### Fused filter-map hybrid
601
602Reactive operators like
603[filter](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#filter(io.reactivex.functions.Predicate)) and
604[map](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#map(io.reactivex.functions.Function))
605are trivial to implement with coroutines. For a bit of challenge and showcase, let us combine them
606into the single `fusedFilterMap` operator:
607
608<!--- INCLUDE
609import kotlinx.coroutines.*
610import kotlinx.coroutines.reactive.*
611import org.reactivestreams.*
612import kotlin.coroutines.*
613-->
614
615```kotlin
616fun <T, R> Publisher<T>.fusedFilterMap(
617    context: CoroutineContext,   // the context to execute this coroutine in
618    predicate: (T) -> Boolean,   // the filter predicate
619    mapper: (T) -> R             // the mapper function
620) = publish<R>(context) {
621    collect {                    // collect the source stream
622        if (predicate(it))       // filter part
623            send(mapper(it))     // map part
624    }
625}
626```
627
628Using `range` from the previous example we can test our `fusedFilterMap`
629by filtering for even numbers and mapping them to strings:
630
631<!--- INCLUDE
632
633fun CoroutineScope.range(start: Int, count: Int) = publish<Int> {
634    for (x in start until start + count) send(x)
635}
636-->
637
638```kotlin
639fun main() = runBlocking<Unit> {
640   range(1, 5)
641       .fusedFilterMap(Dispatchers.Unconfined, { it % 2 == 0}, { "$it is even" })
642       .collect { println(it) } // print all the resulting strings
643}
644```
645
646> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-operators-02.kt).
647
648It is not hard to see that the result is going to be:
649
650```text
6512 is even
6524 is even
653```
654
655<!--- TEST -->
656
657### Take until
658
659Let's implement our own version of
660[takeUntil](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#takeUntil(org.reactivestreams.Publisher))
661operator. It is quite [tricky](https://akarnokd.blogspot.ru/2015/05/pitfalls-of-operator-implementations.html)
662as subscriptions to two streams need to be tracked and managed.
663We need to relay all the elements from the source stream until the other stream either completes or
664emits anything. However, we have the [select] expression to rescue us in the coroutines implementation:
665
666<!--- INCLUDE
667import kotlinx.coroutines.channels.*
668import kotlinx.coroutines.*
669import kotlinx.coroutines.reactive.*
670import kotlinx.coroutines.selects.*
671import org.reactivestreams.*
672import kotlin.coroutines.*
673-->
674
675```kotlin
676fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = publish<T>(context) {
677    this@takeUntil.openSubscription().consume { // explicitly open channel to Publisher<T>
678        val current = this
679        other.openSubscription().consume { // explicitly open channel to Publisher<U>
680            val other = this
681            whileSelect {
682                other.onReceive { false }            // bail out on any received element from `other`
683                current.onReceive { send(it); true } // resend element from this channel and continue
684            }
685        }
686    }
687}
688```
689
690This code is using [whileSelect] as a nicer shortcut to `while(select{...}) {}` loop and Kotlin's
691[consume] expressions to close the channels on exit, which unsubscribes from the corresponding publishers.
692
693The following hand-written combination of
694[range](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#range(int,%20int)) with
695[interval](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#interval(long,%20java.util.concurrent.TimeUnit,%20io.reactivex.Scheduler))
696is used for testing. It is coded using a `publish` coroutine builder
697(its pure-Rx implementation is shown in later sections):
698
699```kotlin
700fun CoroutineScope.rangeWithInterval(time: Long, start: Int, count: Int) = publish<Int> {
701    for (x in start until start + count) {
702        delay(time) // wait before sending each number
703        send(x)
704    }
705}
706```
707
708The following code shows how `takeUntil` works:
709
710```kotlin
711fun main() = runBlocking<Unit> {
712    val slowNums = rangeWithInterval(200, 1, 10)         // numbers with 200ms interval
713    val stop = rangeWithInterval(500, 1, 10)             // the first one after 500ms
714    slowNums.takeUntil(Dispatchers.Unconfined, stop).collect { println(it) } // let's test it
715}
716```
717
718> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-operators-03.kt).
719
720Producing
721
722```text
7231
7242
725```
726
727<!--- TEST -->
728
729### Merge
730
731There are always at least two ways for processing multiple streams of data with coroutines. One way involving
732[select] was shown in the previous example. The other way is just to launch multiple coroutines. Let
733us implement
734[merge](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#merge(org.reactivestreams.Publisher))
735operator using the latter approach:
736
737<!--- INCLUDE
738import kotlinx.coroutines.*
739import kotlinx.coroutines.reactive.*
740import org.reactivestreams.*
741import kotlin.coroutines.*
742-->
743
744```kotlin
745fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = publish<T>(context) {
746  collect { pub -> // for each publisher collected
747      launch {  // launch a child coroutine
748          pub.collect { send(it) } // resend all element from this publisher
749      }
750  }
751}
752```
753
754Notice that all the coroutines that are
755being launched here are the children of the `publish`
756coroutine and will get cancelled when the `publish` coroutine is cancelled or is otherwise completed.
757Moreover, since the parent coroutine waits until all the children are complete, this implementation fully
758merges all the received streams.
759
760For a test, let us start with the `rangeWithInterval` function from the previous example and write a
761producer that sends its results twice with some delay:
762
763<!--- INCLUDE
764
765fun CoroutineScope.rangeWithInterval(time: Long, start: Int, count: Int) = publish<Int> {
766    for (x in start until start + count) {
767        delay(time) // wait before sending each number
768        send(x)
769    }
770}
771-->
772
773```kotlin
774fun CoroutineScope.testPub() = publish<Publisher<Int>> {
775    send(rangeWithInterval(250, 1, 4)) // number 1 at 250ms, 2 at 500ms, 3 at 750ms, 4 at 1000ms
776    delay(100) // wait for 100 ms
777    send(rangeWithInterval(500, 11, 3)) // number 11 at 600ms, 12 at 1100ms, 13 at 1600ms
778    delay(1100) // wait for 1.1s - done in 1.2 sec after start
779}
780```
781
782The test code is to use `merge` on `testPub` and to display the results:
783
784```kotlin
785fun main() = runBlocking<Unit> {
786    testPub().merge(Dispatchers.Unconfined).collect { println(it) } // print the whole stream
787}
788```
789
790> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-operators-04.kt).
791
792And the results should be:
793
794```text
7951
7962
79711
7983
7994
80012
80113
802```
803
804<!--- TEST -->
805
806## Coroutine context
807
808All the example operators that are shown in the previous section have an explicit
809[CoroutineContext](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines/-coroutine-context/)
810parameter. In the Rx world it roughly corresponds to
811a [Scheduler](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Scheduler.html).
812
813### Threads with Rx
814
815The following example shows the basics of threading context management with Rx.
816Here `rangeWithIntervalRx` is an implementation of `rangeWithInterval` function using Rx
817`zip`, `range`, and `interval` operators.
818
819<!--- INCLUDE
820import io.reactivex.*
821import io.reactivex.functions.BiFunction
822import io.reactivex.schedulers.Schedulers
823import java.util.concurrent.TimeUnit
824-->
825
826```kotlin
827fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
828    Flowable.zip(
829        Flowable.range(start, count),
830        Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
831        BiFunction { x, _ -> x })
832
833fun main() {
834    rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
835        .subscribe { println("$it on thread ${Thread.currentThread().name}") }
836    Thread.sleep(1000)
837}
838```
839
840> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-context-01.kt).
841
842We are explicitly passing the
843[Schedulers.computation()](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/schedulers/Schedulers.html#computation())
844scheduler to our `rangeWithIntervalRx` operator and
845it is going to be executed in Rx computation thread pool. The output is going to be similar to the following one:
846
847```text
8481 on thread RxComputationThreadPool-1
8492 on thread RxComputationThreadPool-1
8503 on thread RxComputationThreadPool-1
851```
852
853<!--- TEST FLEXIBLE_THREAD -->
854
855### Threads with coroutines
856
857In the world of coroutines `Schedulers.computation()` roughly corresponds to [Dispatchers.Default],
858so the previous example is similar to the following one:
859
860<!--- INCLUDE
861import io.reactivex.*
862import kotlinx.coroutines.*
863import kotlinx.coroutines.reactive.*
864import kotlin.coroutines.CoroutineContext
865-->
866
867```kotlin
868fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
869    for (x in start until start + count) {
870        delay(time) // wait before sending each number
871        send(x)
872    }
873}
874
875fun main() {
876    Flowable.fromPublisher(rangeWithInterval(Dispatchers.Default, 100, 1, 3))
877        .subscribe { println("$it on thread ${Thread.currentThread().name}") }
878    Thread.sleep(1000)
879}
880```
881
882> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-context-02.kt).
883
884The produced output is going to be similar to:
885
886```text
8871 on thread ForkJoinPool.commonPool-worker-1
8882 on thread ForkJoinPool.commonPool-worker-1
8893 on thread ForkJoinPool.commonPool-worker-1
890```
891
892<!--- TEST LINES_START -->
893
894Here we've used Rx
895[subscribe](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#subscribe(io.reactivex.functions.Consumer))
896operator that does not have its own scheduler and operates on the same thread that the publisher -- on a default
897shared pool of threads in this example.
898
899### Rx observeOn
900
901In Rx you use special operators to modify the threading context for operations in the chain. You
902can find some [good guides](https://tomstechnicalblog.blogspot.ru/2016/02/rxjava-understanding-observeon-and.html)
903about them, if you are not familiar.
904
905For example, there is
906[observeOn](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#observeOn(io.reactivex.Scheduler))
907operator. Let us modify the previous example to observe using `Schedulers.computation()`:
908
909<!--- INCLUDE
910import io.reactivex.*
911import kotlinx.coroutines.*
912import kotlinx.coroutines.reactive.*
913import io.reactivex.schedulers.Schedulers
914import kotlin.coroutines.CoroutineContext
915-->
916
917```kotlin
918fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
919    for (x in start until start + count) {
920        delay(time) // wait before sending each number
921        send(x)
922    }
923}
924
925fun main() {
926    Flowable.fromPublisher(rangeWithInterval(Dispatchers.Default, 100, 1, 3))
927        .observeOn(Schedulers.computation())                           // <-- THIS LINE IS ADDED
928        .subscribe { println("$it on thread ${Thread.currentThread().name}") }
929    Thread.sleep(1000)
930}
931```
932
933> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-context-03.kt).
934
935Here is the difference in output, notice "RxComputationThreadPool":
936
937```text
9381 on thread RxComputationThreadPool-1
9392 on thread RxComputationThreadPool-1
9403 on thread RxComputationThreadPool-1
941```
942
943<!--- TEST FLEXIBLE_THREAD -->
944
945### Coroutine context to rule them all
946
947A coroutine is always working in some context. For example, let us start a coroutine
948in the main thread with [runBlocking] and iterate over the result of the Rx version of `rangeWithIntervalRx` operator,
949instead of using Rx `subscribe` operator:
950
951<!--- INCLUDE
952import io.reactivex.*
953import kotlinx.coroutines.*
954import kotlinx.coroutines.reactive.*
955import io.reactivex.functions.BiFunction
956import io.reactivex.schedulers.Schedulers
957import java.util.concurrent.TimeUnit
958-->
959
960```kotlin
961fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
962    Flowable.zip(
963        Flowable.range(start, count),
964        Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
965        BiFunction { x, _ -> x })
966
967fun main() = runBlocking<Unit> {
968    rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
969        .collect { println("$it on thread ${Thread.currentThread().name}") }
970}
971```
972
973> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-context-04.kt).
974
975The resulting messages are going to be printed in the main thread:
976
977```text
9781 on thread main
9792 on thread main
9803 on thread main
981```
982
983<!--- TEST LINES_START -->
984
985### Unconfined context
986
987Most Rx operators do not have any specific thread (scheduler) associated with them and are working
988in whatever thread they happen to be invoked. We've seen it in the example with the `subscribe` operator
989in the [threads with Rx](#threads-with-rx) section.
990
991In the world of coroutines, [Dispatchers.Unconfined] context serves a similar role. Let us modify our previous example,
992but instead of iterating over the source `Flowable` from the `runBlocking` coroutine that is confined
993to the main thread, we launch a new coroutine in the `Dispatchers.Unconfined` context, while the main coroutine
994simply waits for its completion using [Job.join]:
995
996<!--- INCLUDE
997import io.reactivex.*
998import kotlinx.coroutines.*
999import kotlinx.coroutines.reactive.*
1000import io.reactivex.functions.BiFunction
1001import io.reactivex.schedulers.Schedulers
1002import java.util.concurrent.TimeUnit
1003-->
1004
1005```kotlin
1006fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
1007    Flowable.zip(
1008        Flowable.range(start, count),
1009        Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
1010        BiFunction { x, _ -> x })
1011
1012fun main() = runBlocking<Unit> {
1013    val job = launch(Dispatchers.Unconfined) { // launch a new coroutine in Unconfined context (without its own thread pool)
1014        rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
1015            .collect { println("$it on thread ${Thread.currentThread().name}") }
1016    }
1017    job.join() // wait for our coroutine to complete
1018}
1019```
1020
1021> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-context-05.kt).
1022
1023Now, the output shows that the code of the coroutine is executing in the Rx computation thread pool, just
1024like our initial example using the Rx `subscribe` operator.
1025
1026```text
10271 on thread RxComputationThreadPool-1
10282 on thread RxComputationThreadPool-1
10293 on thread RxComputationThreadPool-1
1030```
1031
1032<!--- TEST LINES_START -->
1033
1034Note that the [Dispatchers.Unconfined] context should be used with care. It may improve the overall performance on certain tests,
1035due to the increased stack-locality of operations and less scheduling overhead, but it also produces deeper stacks
1036and makes it harder to reason about asynchronicity of the code that is using it.
1037
1038If a coroutine sends an element to a channel, then the thread that invoked the
1039[send][SendChannel.send] may start executing the code of the coroutine with the [Dispatchers.Unconfined] dispatcher.
1040The original producer coroutine that invoked `send`  is paused until the unconfined consumer coroutine hits its next
1041suspension point. This is very similar to a lock-step single-threaded `onNext` execution in the Rx world in the absense
1042of thread-shifting operators. It is a normal default for Rx, because operators are usually doing very small chunks
1043of work and you have to combine many operators for a complex processing. However, this is unusual with coroutines,
1044where you can have an arbitrary complex processing in a coroutine. Usually, you only need to chain stream-processing
1045coroutines for complex pipelines with fan-in and fan-out between multiple worker coroutines.
1046
1047<!--- MODULE kotlinx-coroutines-core -->
1048<!--- INDEX kotlinx.coroutines -->
1049[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
1050[Dispatchers.Unconfined]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-unconfined.html
1051[yield]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/yield.html
1052[Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
1053[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/join.html
1054<!--- INDEX kotlinx.coroutines.channels -->
1055[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html
1056[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html
1057[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html
1058[consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume-each.html
1059[ReceiveChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/index.html
1060[ReceiveChannel.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/cancel.html
1061[consume]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume.html
1062[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html
1063[BroadcastChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-broadcast-channel/index.html
1064[ConflatedBroadcastChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-conflated-broadcast-channel/index.html
1065<!--- INDEX kotlinx.coroutines.selects -->
1066[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html
1067[whileSelect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/while-select.html
1068<!--- MODULE kotlinx-coroutines-reactive -->
1069<!--- INDEX kotlinx.coroutines.reactive -->
1070[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/publish.html
1071[org.reactivestreams.Publisher.collect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/collect.html
1072[org.reactivestreams.Publisher.openSubscription]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/open-subscription.html
1073<!--- MODULE kotlinx-coroutines-rx2 -->
1074<!--- INDEX kotlinx.coroutines.rx2 -->
1075[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-flowable.html
1076<!--- END -->
1077
1078
1079