• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 // This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
6 package kotlinx.coroutines.rx2.guide.operators03
7 
8 import kotlinx.coroutines.channels.*
9 import kotlinx.coroutines.*
10 import kotlinx.coroutines.reactive.*
11 import kotlinx.coroutines.selects.*
12 import org.reactivestreams.*
13 import kotlin.coroutines.*
14 
<lambda>null15 fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = publish<T>(context) {
16     this@takeUntil.openSubscription().consume { // explicitly open channel to Publisher<T>
17         val current = this
18         other.openSubscription().consume { // explicitly open channel to Publisher<U>
19             val other = this
20             whileSelect {
21                 other.onReceive { false }            // bail out on any received element from `other`
22                 current.onReceive { send(it); true } // resend element from this channel and continue
23             }
24         }
25     }
26 }
27 
<lambda>null28 fun CoroutineScope.rangeWithInterval(time: Long, start: Int, count: Int) = publish<Int> {
29     for (x in start until start + count) {
30         delay(time) // wait before sending each number
31         send(x)
32     }
33 }
34 
<lambda>null35 fun main() = runBlocking<Unit> {
36     val slowNums = rangeWithInterval(200, 1, 10)         // numbers with 200ms interval
37     val stop = rangeWithInterval(500, 1, 10)             // the first one after 500ms
38     slowNums.takeUntil(Dispatchers.Unconfined, stop).collect { println(it) } // let's test it
39 }
40