• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package kotlinx.coroutines.reactor
2 
3 import kotlinx.coroutines.*
4 import kotlinx.coroutines.channels.*
5 import reactor.core.publisher.*
6 import kotlin.coroutines.*
7 
8 /**
9  * Converts this job to the hot reactive mono that signals
10  * with [success][MonoSink.success] when the corresponding job completes.
11  *
12  * Every subscriber gets the signal at the same time.
13  * Unsubscribing from the resulting mono **does not** affect the original job in any way.
14  *
15  * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
16  *    in the future to account for the concept of structured concurrency.
17  *
18  * @param context -- the coroutine context from which the resulting mono is going to be signalled
19  */
<lambda>null20 public fun Job.asMono(context: CoroutineContext): Mono<Unit> = mono(context) { this@asMono.join() }
21 /**
22  * Converts this deferred value to the hot reactive mono that signals
23  * [success][MonoSink.success] or [error][MonoSink.error].
24  *
25  * Every subscriber gets the same completion value.
26  * Unsubscribing from the resulting mono **does not** affect the original deferred value in any way.
27  *
28  * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
29  *    in the future to account for the concept of structured concurrency.
30  *
31  * @param context -- the coroutine context from which the resulting mono is going to be signalled
32  */
<lambda>null33 public fun <T> Deferred<T?>.asMono(context: CoroutineContext): Mono<T> = mono(context) { this@asMono.await() }
34 
35 /**
36  * Converts a stream of elements received from the channel to the hot reactive flux.
37  *
38  * Every subscriber receives values from this channel in a **fan-out** fashion. If the are multiple subscribers,
39  * they'll receive values in a round-robin way.
40  * @param context -- the coroutine context from which the resulting flux is going to be signalled
41  * @suppress
42  */
43 @Deprecated(message = "Deprecated in the favour of consumeAsFlow()",
44     level = DeprecationLevel.HIDDEN,
45     replaceWith = ReplaceWith("this.consumeAsFlow().asFlux(context)", imports = ["kotlinx.coroutines.flow.consumeAsFlow"]))
<lambda>null46 public fun <T> ReceiveChannel<T>.asFlux(context: CoroutineContext = EmptyCoroutineContext): Flux<T> = flux(context) {
47     for (t in this@asFlux)
48         send(t)
49 }
50