• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactor
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.channels.*
9 import reactor.core.publisher.*
10 import kotlin.coroutines.*
11 
12 /**
13  * Converts this job to the hot reactive mono that signals
14  * with [success][MonoSink.success] when the corresponding job completes.
15  *
16  * Every subscriber gets the signal at the same time.
17  * Unsubscribing from the resulting mono **does not** affect the original job in any way.
18  *
19  * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
20  *    in the future to account for the concept of structured concurrency.
21  *
22  * @param context -- the coroutine context from which the resulting mono is going to be signalled
23  */
<lambda>null24 public fun Job.asMono(context: CoroutineContext): Mono<Unit> = mono(context) { this@asMono.join() }
25 /**
26  * Converts this deferred value to the hot reactive mono that signals
27  * [success][MonoSink.success] or [error][MonoSink.error].
28  *
29  * Every subscriber gets the same completion value.
30  * Unsubscribing from the resulting mono **does not** affect the original deferred value in any way.
31  *
32  * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
33  *    in the future to account for the concept of structured concurrency.
34  *
35  * @param context -- the coroutine context from which the resulting mono is going to be signalled
36  */
<lambda>null37 public fun <T> Deferred<T?>.asMono(context: CoroutineContext): Mono<T> = mono(context) { this@asMono.await() }
38 
39 /**
40  * Converts a stream of elements received from the channel to the hot reactive flux.
41  *
42  * Every subscriber receives values from this channel in a **fan-out** fashion. If the are multiple subscribers,
43  * they'll receive values in a round-robin way.
44  * @param context -- the coroutine context from which the resulting flux is going to be signalled
45  * @suppress
46  */
47 @Deprecated(message = "Deprecated in the favour of consumeAsFlow()",
48     level = DeprecationLevel.HIDDEN,
49     replaceWith = ReplaceWith("this.consumeAsFlow().asFlux(context)", imports = ["kotlinx.coroutines.flow.consumeAsFlow"]))
<lambda>null50 public fun <T> ReceiveChannel<T>.asFlux(context: CoroutineContext = EmptyCoroutineContext): Flux<T> = flux(context) {
51     for (t in this@asFlux)
52         send(t)
53 }
54