1 package kotlinx.coroutines.reactor 2 3 import kotlinx.coroutines.* 4 import kotlinx.coroutines.channels.* 5 import reactor.core.publisher.* 6 import kotlin.coroutines.* 7 8 /** 9 * Converts this job to the hot reactive mono that signals 10 * with [success][MonoSink.success] when the corresponding job completes. 11 * 12 * Every subscriber gets the signal at the same time. 13 * Unsubscribing from the resulting mono **does not** affect the original job in any way. 14 * 15 * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change 16 * in the future to account for the concept of structured concurrency. 17 * 18 * @param context -- the coroutine context from which the resulting mono is going to be signalled 19 */ <lambda>null20public fun Job.asMono(context: CoroutineContext): Mono<Unit> = mono(context) { this@asMono.join() } 21 /** 22 * Converts this deferred value to the hot reactive mono that signals 23 * [success][MonoSink.success] or [error][MonoSink.error]. 24 * 25 * Every subscriber gets the same completion value. 26 * Unsubscribing from the resulting mono **does not** affect the original deferred value in any way. 27 * 28 * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change 29 * in the future to account for the concept of structured concurrency. 30 * 31 * @param context -- the coroutine context from which the resulting mono is going to be signalled 32 */ <lambda>null33public fun <T> Deferred<T?>.asMono(context: CoroutineContext): Mono<T> = mono(context) { this@asMono.await() } 34 35 /** 36 * Converts a stream of elements received from the channel to the hot reactive flux. 37 * 38 * Every subscriber receives values from this channel in a **fan-out** fashion. If the are multiple subscribers, 39 * they'll receive values in a round-robin way. 40 * @param context -- the coroutine context from which the resulting flux is going to be signalled 41 * @suppress 42 */ 43 @Deprecated(message = "Deprecated in the favour of consumeAsFlow()", 44 level = DeprecationLevel.HIDDEN, 45 replaceWith = ReplaceWith("this.consumeAsFlow().asFlux(context)", imports = ["kotlinx.coroutines.flow.consumeAsFlow"])) <lambda>null46public fun <T> ReceiveChannel<T>.asFlux(context: CoroutineContext = EmptyCoroutineContext): Flux<T> = flux(context) { 47 for (t in this@asFlux) 48 send(t) 49 } 50