• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 @file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
6 
7 package kotlinx.coroutines.reactor
8 
9 import kotlinx.coroutines.*
10 import kotlinx.coroutines.reactive.*
11 import org.reactivestreams.*
12 import reactor.core.*
13 import reactor.core.publisher.*
14 import kotlin.coroutines.*
15 import kotlinx.coroutines.internal.*
16 
17 /**
18  * Creates a cold [mono][Mono] that runs a given [block] in a coroutine and emits its result.
19  * Every time the returned mono is subscribed, it starts a new coroutine.
20  * If the result of [block] is `null`, [MonoSink.success] is invoked without a value.
21  * Unsubscribing cancels the running coroutine.
22  *
23  * Coroutine context can be specified with [context] argument.
24  * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
25  *
26  * @throws IllegalArgumentException if the provided [context] contains a [Job] instance.
27  */
28 public fun <T> mono(
29     context: CoroutineContext = EmptyCoroutineContext,
30     block: suspend CoroutineScope.() -> T?
31 ): Mono<T> {
32     require(context[Job] === null) { "Mono context cannot contain job in it." +
33             "Its lifecycle should be managed via Disposable handle. Had $context" }
34     return monoInternal(GlobalScope, context, block)
35 }
36 
37 /**
38  * Awaits the single value from the given [Mono] without blocking the thread and returns the resulting value, or, if
39  * this publisher has produced an error, throws the corresponding exception. If the Mono completed without a value,
40  * `null` is returned.
41  *
42  * This suspending function is cancellable.
43  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
44  * function immediately cancels its [Subscription] and resumes with [CancellationException].
45  */
awaitSingleOrNullnull46 public suspend fun <T> Mono<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
47     injectCoroutineContext(cont.context).subscribe(object : Subscriber<T> {
48         private var value: T? = null
49 
50         override fun onSubscribe(s: Subscription) {
51             cont.invokeOnCancellation { s.cancel() }
52             s.request(Long.MAX_VALUE)
53         }
54 
55         override fun onComplete() {
56             cont.resume(value)
57             value = null
58         }
59 
60         override fun onNext(t: T) {
61             // We don't return the value immediately because the process that emitted it may not be finished yet.
62             // Resuming now could lead to race conditions between emitter and the awaiting code.
63             value = t
64         }
65 
66         override fun onError(error: Throwable) { cont.resumeWithException(error) }
67     })
68 }
69 
70 /**
71  * Awaits the single value from the given [Mono] without blocking the thread and returns the resulting value, or,
72  * if this Mono has produced an error, throws the corresponding exception.
73  *
74  * This suspending function is cancellable.
75  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
76  * function immediately cancels its [Subscription] and resumes with [CancellationException].
77  *
78  * @throws NoSuchElementException if the Mono does not emit any value
79  */
80 // TODO: consider using https://github.com/Kotlin/kotlinx.coroutines/issues/2607 once that lands
awaitSinglenull81 public suspend fun <T> Mono<T>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException()
82 
83 private fun <T> monoInternal(
84     scope: CoroutineScope, // support for legacy mono in scope
85     context: CoroutineContext,
86     block: suspend CoroutineScope.() -> T?
87 ): Mono<T> = Mono.create { sink ->
88     val reactorContext = context.extendReactorContext(sink.currentContext())
89     val newContext = scope.newCoroutineContext(context + reactorContext)
90     val coroutine = MonoCoroutine(newContext, sink)
91     sink.onDispose(coroutine)
92     coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
93 }
94 
95 private class MonoCoroutine<in T>(
96     parentContext: CoroutineContext,
97     private val sink: MonoSink<T>
98 ) : AbstractCoroutine<T>(parentContext, false, true), Disposable {
99     @Volatile
100     private var disposed = false
101 
onCompletednull102     override fun onCompleted(value: T) {
103         if (value == null) sink.success() else sink.success(value)
104     }
105 
onCancellednull106     override fun onCancelled(cause: Throwable, handled: Boolean) {
107         /** Cancellation exceptions that were caused by [dispose], that is, came from downstream, are not errors. */
108         val unwrappedCause = unwrap(cause)
109         if (getCancellationException() !== unwrappedCause || !disposed) {
110             try {
111                 /** If [sink] turns out to already be in a terminal state, this exception will be passed through the
112                  * [Hooks.onOperatorError] hook, which is the way to signal undeliverable exceptions in Reactor. */
113                 sink.error(cause)
114             } catch (e: Throwable) {
115                 // In case of improper error implementation or fatal exceptions
116                 cause.addSuppressed(e)
117                 handleCoroutineException(context, cause)
118             }
119         }
120     }
121 
disposenull122     override fun dispose() {
123         disposed = true
124         cancel()
125     }
126 
isDisposednull127     override fun isDisposed(): Boolean = disposed
128 }
129 
130 /**
131  * @suppress
132  */
133 @Deprecated(
134     message = "CoroutineScope.mono is deprecated in favour of top-level mono",
135     level = DeprecationLevel.HIDDEN,
136     replaceWith = ReplaceWith("mono(context, block)")
137 ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
138 public fun <T> CoroutineScope.mono(
139     context: CoroutineContext = EmptyCoroutineContext,
140     block: suspend CoroutineScope.() -> T?
141 ): Mono<T> = monoInternal(this, context, block)
142 
143 /**
144  * This is a lint function that was added already deprecated in order to guard against confusing usages on [Mono].
145  * On [Publisher] instances other than [Mono], this function is not deprecated.
146  *
147  * Both [awaitFirst] and [awaitSingle] await the first value, or throw [NoSuchElementException] if there is none, but
148  * the name [Mono.awaitSingle] better reflects the semantics of [Mono].
149  *
150  * For example, consider this code:
151  * ```
152  * myDbClient.findById(uniqueId).awaitFirst() // findById returns a `Mono`
153  * ```
154  * It looks like more than one value could be returned from `findById` and [awaitFirst] discards the extra elements,
155  * when in fact, at most a single value can be present.
156  *
157  * @suppress
158  */
159 @Deprecated(
160     message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " +
161         "Please use awaitSingle() instead.",
162     level = DeprecationLevel.HIDDEN,
163     replaceWith = ReplaceWith("this.awaitSingle()")
164 ) // Warning since 1.5, error in 1.6
165 public suspend fun <T> Mono<T>.awaitFirst(): T = awaitSingle()
166 
167 /**
168  * This is a lint function that was added already deprecated in order to guard against confusing usages on [Mono].
169  * On [Publisher] instances other than [Mono], this function is not deprecated.
170  *
171  * Both [awaitFirstOrDefault] and [awaitSingleOrNull] await the first value, or return some special value if there
172  * is none, but the name [Mono.awaitSingleOrNull] better reflects the semantics of [Mono].
173  *
174  * For example, consider this code:
175  * ```
176  * myDbClient.findById(uniqueId).awaitFirstOrDefault(default) // findById returns a `Mono`
177  * ```
178  * It looks like more than one value could be returned from `findById` and [awaitFirstOrDefault] discards the extra
179  * elements, when in fact, at most a single value can be present.
180  *
181  * @suppress
182  */
183 @Deprecated(
184     message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " +
185         "Please use awaitSingleOrNull() instead.",
186     level = DeprecationLevel.HIDDEN,
187     replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default")
188 ) // Warning since 1.5, error in 1.6
189 public suspend fun <T> Mono<T>.awaitFirstOrDefault(default: T): T = awaitSingleOrNull() ?: default
190 
191 /**
192  * This is a lint function that was added already deprecated in order to guard against confusing usages on [Mono].
193  * On [Publisher] instances other than [Mono], this function is not deprecated.
194  *
195  * Both [awaitFirstOrNull] and [awaitSingleOrNull] await the first value, or return some special value if there
196  * is none, but the name [Mono.awaitSingleOrNull] better reflects the semantics of [Mono].
197  *
198  * For example, consider this code:
199  * ```
200  * myDbClient.findById(uniqueId).awaitFirstOrNull() // findById returns a `Mono`
201  * ```
202  * It looks like more than one value could be returned from `findById` and [awaitFirstOrNull] discards the extra
203  * elements, when in fact, at most a single value can be present.
204  *
205  * @suppress
206  */
207 @Deprecated(
208     message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " +
209         "Please use awaitSingleOrNull() instead.",
210     level = DeprecationLevel.HIDDEN,
211     replaceWith = ReplaceWith("this.awaitSingleOrNull()")
212 ) // Warning since 1.5, error in 1.6
213 public suspend fun <T> Mono<T>.awaitFirstOrNull(): T? = awaitSingleOrNull()
214 
215 /**
216  * This is a lint function that was added already deprecated in order to guard against confusing usages on [Mono].
217  * On [Publisher] instances other than [Mono], this function is not deprecated.
218  *
219  * Both [awaitFirstOrElse] and [awaitSingleOrNull] await the first value, or return some special value if there
220  * is none, but the name [Mono.awaitSingleOrNull] better reflects the semantics of [Mono].
221  *
222  * For example, consider this code:
223  * ```
224  * myDbClient.findById(uniqueId).awaitFirstOrElse(defaultValue) // findById returns a `Mono`
225  * ```
226  * It looks like more than one value could be returned from `findById` and [awaitFirstOrElse] discards the extra
227  * elements, when in fact, at most a single value can be present.
228  *
229  * @suppress
230  */
231 @Deprecated(
232     message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " +
233         "Please use awaitSingleOrNull() instead.",
234     level = DeprecationLevel.HIDDEN,
235     replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: defaultValue()")
236 ) // Warning since 1.5, error in 1.6
237 public suspend fun <T> Mono<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitSingleOrNull() ?: defaultValue()
238 
239 /**
240  * This is a lint function that was added already deprecated in order to guard against confusing usages on [Mono].
241  * On [Publisher] instances other than [Mono], this function is not deprecated.
242  *
243  * Both [awaitLast] and [awaitSingle] await the single value, or throw [NoSuchElementException] if there is none, but
244  * the name [Mono.awaitSingle] better reflects the semantics of [Mono].
245  *
246  * For example, consider this code:
247  * ```
248  * myDbClient.findById(uniqueId).awaitLast() // findById returns a `Mono`
249  * ```
250  * It looks like more than one value could be returned from `findById` and [awaitLast] discards the initial elements,
251  * when in fact, at most a single value can be present.
252  *
253  * @suppress
254  */
255 @Deprecated(
256     message = "Mono produces at most one value, so the last element is the same as the first. " +
257         "Please use awaitSingle() instead.",
258     level = DeprecationLevel.HIDDEN,
259     replaceWith = ReplaceWith("this.awaitSingle()")
260 ) // Warning since 1.5, error in 1.6
261 public suspend fun <T> Mono<T>.awaitLast(): T = awaitSingle()
262