1 /*
2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.reactor
6
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.channels.*
9 import reactor.core.publisher.*
10 import kotlin.coroutines.*
11
12 /**
13 * Converts this job to the hot reactive mono that signals
14 * with [success][MonoSink.success] when the corresponding job completes.
15 *
16 * Every subscriber gets the signal at the same time.
17 * Unsubscribing from the resulting mono **does not** affect the original job in any way.
18 *
19 * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
20 * in the future to account for the concept of structured concurrency.
21 *
22 * @param context -- the coroutine context from which the resulting mono is going to be signalled
23 */
<lambda>null24 public fun Job.asMono(context: CoroutineContext): Mono<Unit> = mono(context) { this@asMono.join() }
25 /**
26 * Converts this deferred value to the hot reactive mono that signals
27 * [success][MonoSink.success] or [error][MonoSink.error].
28 *
29 * Every subscriber gets the same completion value.
30 * Unsubscribing from the resulting mono **does not** affect the original deferred value in any way.
31 *
32 * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
33 * in the future to account for the concept of structured concurrency.
34 *
35 * @param context -- the coroutine context from which the resulting mono is going to be signalled
36 */
<lambda>null37 public fun <T> Deferred<T?>.asMono(context: CoroutineContext): Mono<T> = mono(context) { this@asMono.await() }
38
39 /**
40 * Converts a stream of elements received from the channel to the hot reactive flux.
41 *
42 * Every subscriber receives values from this channel in a **fan-out** fashion. If the are multiple subscribers,
43 * they'll receive values in a round-robin way.
44 * @param context -- the coroutine context from which the resulting flux is going to be signalled
45 * @suppress
46 */
47 @Deprecated(message = "Deprecated in the favour of consumeAsFlow()",
48 level = DeprecationLevel.HIDDEN,
49 replaceWith = ReplaceWith("this.consumeAsFlow().asFlux(context)", imports = ["kotlinx.coroutines.flow.consumeAsFlow"]))
<lambda>null50 public fun <T> ReceiveChannel<T>.asFlux(context: CoroutineContext = EmptyCoroutineContext): Flux<T> = flux(context) {
51 for (t in this@asFlux)
52 send(t)
53 }
54