1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.reactive
6
7 import kotlinx.coroutines.*
8 import org.reactivestreams.Publisher
9 import org.reactivestreams.Subscriber
10 import org.reactivestreams.Subscription
11 import java.lang.IllegalStateException
12 import kotlin.coroutines.*
13
14 /**
15 * Awaits the first value from the given publisher without blocking the thread and returns the resulting value, or, if
16 * the publisher has produced an error, throws the corresponding exception.
17 *
18 * This suspending function is cancellable.
19 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
20 * function immediately cancels its [Subscription] and resumes with [CancellationException].
21 *
22 * @throws NoSuchElementException if the publisher does not emit any value
23 */
24 public suspend fun <T> Publisher<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
25
26 /**
27 * Awaits the first value from the given publisher, or returns the [default] value if none is emitted, without blocking
28 * the thread, and returns the resulting value, or, if this publisher has produced an error, throws the corresponding
29 * exception.
30 *
31 * This suspending function is cancellable.
32 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
33 * function immediately cancels its [Subscription] and resumes with [CancellationException].
34 */
35 public suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
36
37 /**
38 * Awaits the first value from the given publisher, or returns `null` if none is emitted, without blocking the thread,
39 * and returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
40 *
41 * This suspending function is cancellable.
42 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
43 * function immediately cancels its [Subscription] and resumes with [CancellationException].
44 */
45 public suspend fun <T> Publisher<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
46
47 /**
48 * Awaits the first value from the given publisher, or calls [defaultValue] to get a value if none is emitted, without
49 * blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the
50 * corresponding exception.
51 *
52 * This suspending function is cancellable.
53 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
54 * function immediately cancels its [Subscription] and resumes with [CancellationException].
55 */
56 public suspend fun <T> Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
57
58 /**
59 * Awaits the last value from the given publisher without blocking the thread and
60 * returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
61 *
62 * This suspending function is cancellable.
63 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
64 * function immediately cancels its [Subscription] and resumes with [CancellationException].
65 *
66 * @throws NoSuchElementException if the publisher does not emit any value
67 */
68 public suspend fun <T> Publisher<T>.awaitLast(): T = awaitOne(Mode.LAST)
69
70 /**
71 * Awaits the single value from the given publisher without blocking the thread and returns the resulting value, or,
72 * if this publisher has produced an error, throws the corresponding exception.
73 *
74 * This suspending function is cancellable.
75 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
76 * function immediately cancels its [Subscription] and resumes with [CancellationException].
77 *
78 * @throws NoSuchElementException if the publisher does not emit any value
79 * @throws IllegalArgumentException if the publisher emits more than one value
80 */
81 public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
82
83 /**
84 * Awaits the single value from the given publisher, or returns the [default] value if none is emitted, without
85 * blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the
86 * corresponding exception.
87 *
88 * This suspending function is cancellable.
89 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
90 * function immediately cancels its [Subscription] and resumes with [CancellationException].
91 *
92 * ### Deprecation
93 *
94 * This method is deprecated because the conventions established in Kotlin mandate that an operation with the name
95 * `awaitSingleOrDefault` returns the default value instead of throwing in case there is an error; however, this would
96 * also mean that this method would return the default value if there are *too many* values. This could be confusing to
97 * those who expect this function to validate that there is a single element or none at all emitted, and cases where
98 * there are no elements are indistinguishable from those where there are too many, though these cases have different
99 * meaning.
100 *
101 * @throws NoSuchElementException if the publisher does not emit any value
102 * @throws IllegalArgumentException if the publisher emits more than one value
103 *
104 * @suppress
105 */
106 @Deprecated(
107 message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " +
108 "Please consider using awaitFirstOrDefault().",
109 level = DeprecationLevel.ERROR
110 ) // Warning since 1.5, error in 1.6, hidden in 1.7
111 public suspend fun <T> Publisher<T>.awaitSingleOrDefault(default: T): T = awaitOne(Mode.SINGLE_OR_DEFAULT, default)
112
113 /**
114 * Awaits the single value from the given publisher without blocking the thread and returns the resulting value, or, if
115 * this publisher has produced an error, throws the corresponding exception. If more than one value or none were
116 * produced by the publisher, `null` is returned.
117 *
118 * This suspending function is cancellable.
119 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
120 * function immediately cancels its [Subscription] and resumes with [CancellationException].
121 *
122 * ### Deprecation
123 *
124 * This method is deprecated because the conventions established in Kotlin mandate that an operation with the name
125 * `awaitSingleOrNull` returns `null` instead of throwing in case there is an error; however, this would
126 * also mean that this method would return `null` if there are *too many* values. This could be confusing to
127 * those who expect this function to validate that there is a single element or none at all emitted, and cases where
128 * there are no elements are indistinguishable from those where there are too many, though these cases have different
129 * meaning.
130 *
131 * @throws IllegalArgumentException if the publisher emits more than one value
132 * @suppress
133 */
134 @Deprecated(
135 message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " +
136 "There is a specialized version for Reactor's Mono, please use that where applicable. " +
137 "Alternatively, please consider using awaitFirstOrNull().",
138 level = DeprecationLevel.ERROR,
139 replaceWith = ReplaceWith("this.awaitSingleOrNull()", "kotlinx.coroutines.reactor")
140 ) // Warning since 1.5, error in 1.6, hidden in 1.7
141 public suspend fun <T> Publisher<T>.awaitSingleOrNull(): T? = awaitOne(Mode.SINGLE_OR_DEFAULT)
142
143 /**
144 * Awaits the single value from the given publisher, or calls [defaultValue] to get a value if none is emitted, without
145 * blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the
146 * corresponding exception.
147 *
148 * This suspending function is cancellable.
149 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
150 * function immediately cancels its [Subscription] and resumes with [CancellationException].
151 *
152 * ### Deprecation
153 *
154 * This method is deprecated because the conventions established in Kotlin mandate that an operation with the name
155 * `awaitSingleOrElse` returns the calculated value instead of throwing in case there is an error; however, this would
156 * also mean that this method would return the calculated value if there are *too many* values. This could be confusing
157 * to those who expect this function to validate that there is a single element or none at all emitted, and cases where
158 * there are no elements are indistinguishable from those where there are too many, though these cases have different
159 * meaning.
160 *
161 * @throws IllegalArgumentException if the publisher emits more than one value
162 * @suppress
163 */
164 @Deprecated(
165 message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " +
166 "Please consider using awaitFirstOrElse().",
167 level = DeprecationLevel.ERROR
168 ) // Warning since 1.5, error in 1.6, hidden in 1.7
169 public suspend fun <T> Publisher<T>.awaitSingleOrElse(defaultValue: () -> T): T =
170 awaitOne(Mode.SINGLE_OR_DEFAULT) ?: defaultValue()
171
172 // ------------------------ private ------------------------
173
174 private enum class Mode(val s: String) {
175 FIRST("awaitFirst"),
176 FIRST_OR_DEFAULT("awaitFirstOrDefault"),
177 LAST("awaitLast"),
178 SINGLE("awaitSingle"),
179 SINGLE_OR_DEFAULT("awaitSingleOrDefault");
180 override fun toString(): String = s
181 }
182
awaitOnenull183 private suspend fun <T> Publisher<T>.awaitOne(
184 mode: Mode,
185 default: T? = null
186 ): T = suspendCancellableCoroutine { cont ->
187 /* This implementation must obey
188 https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#2-subscriber-code
189 The numbers of rules are taken from there. */
190 injectCoroutineContext(cont.context).subscribe(object : Subscriber<T> {
191 // It is unclear whether 2.13 implies (T: Any), but if so, it seems that we don't break anything by not adhering
192 private var subscription: Subscription? = null
193 private var value: T? = null
194 private var seenValue = false
195 private var inTerminalState = false
196
197 override fun onSubscribe(sub: Subscription) {
198 /** cancelling the new subscription due to rule 2.5, though the publisher would either have to
199 * subscribe more than once, which would break 2.12, or leak this [Subscriber]. */
200 if (subscription != null) {
201 withSubscriptionLock {
202 sub.cancel()
203 }
204 return
205 }
206 subscription = sub
207 cont.invokeOnCancellation {
208 withSubscriptionLock {
209 sub.cancel()
210 }
211 }
212 withSubscriptionLock {
213 sub.request(if (mode == Mode.FIRST || mode == Mode.FIRST_OR_DEFAULT) 1 else Long.MAX_VALUE)
214 }
215 }
216
217 override fun onNext(t: T) {
218 val sub = subscription.let {
219 if (it == null) {
220 /** Enforce rule 1.9: expect [Subscriber.onSubscribe] before any other signals. */
221 handleCoroutineException(cont.context,
222 IllegalStateException("'onNext' was called before 'onSubscribe'"))
223 return
224 } else {
225 it
226 }
227 }
228 if (inTerminalState) {
229 gotSignalInTerminalStateException(cont.context, "onNext")
230 return
231 }
232 when (mode) {
233 Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
234 if (seenValue) {
235 moreThanOneValueProvidedException(cont.context, mode)
236 return
237 }
238 seenValue = true
239 withSubscriptionLock {
240 sub.cancel()
241 }
242 cont.resume(t)
243 }
244 Mode.LAST, Mode.SINGLE, Mode.SINGLE_OR_DEFAULT -> {
245 if ((mode == Mode.SINGLE || mode == Mode.SINGLE_OR_DEFAULT) && seenValue) {
246 withSubscriptionLock {
247 sub.cancel()
248 }
249 /* the check for `cont.isActive` is needed in case `sub.cancel() above calls `onComplete` or
250 `onError` on its own. */
251 if (cont.isActive) {
252 cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
253 }
254 } else {
255 value = t
256 seenValue = true
257 }
258 }
259 }
260 }
261
262 @Suppress("UNCHECKED_CAST")
263 override fun onComplete() {
264 if (!tryEnterTerminalState("onComplete")) {
265 return
266 }
267 if (seenValue) {
268 /* the check for `cont.isActive` is needed because, otherwise, if the publisher doesn't acknowledge the
269 call to `cancel` for modes `SINGLE*` when more than one value was seen, it may call `onComplete`, and
270 here `cont.resume` would fail. */
271 if (mode != Mode.FIRST_OR_DEFAULT && mode != Mode.FIRST && cont.isActive) {
272 cont.resume(value as T)
273 }
274 return
275 }
276 when {
277 (mode == Mode.FIRST_OR_DEFAULT || mode == Mode.SINGLE_OR_DEFAULT) -> {
278 cont.resume(default as T)
279 }
280 cont.isActive -> {
281 // the check for `cont.isActive` is just a slight optimization and doesn't affect correctness
282 cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
283 }
284 }
285 }
286
287 override fun onError(e: Throwable) {
288 if (tryEnterTerminalState("onError")) {
289 cont.resumeWithException(e)
290 }
291 }
292
293 /**
294 * Enforce rule 2.4: assume that the [Publisher] is in a terminal state after [onError] or [onComplete].
295 */
296 private fun tryEnterTerminalState(signalName: String): Boolean {
297 if (inTerminalState) {
298 gotSignalInTerminalStateException(cont.context, signalName)
299 return false
300 }
301 inTerminalState = true
302 return true
303 }
304
305 /**
306 * Enforce rule 2.7: [Subscription.request] and [Subscription.cancel] must be executed serially
307 */
308 @Synchronized
309 private fun withSubscriptionLock(block: () -> Unit) {
310 block()
311 }
312 })
313 }
314
315 /**
316 * Enforce rule 2.4 (detect publishers that don't respect rule 1.7): don't process anything after a terminal
317 * state was reached.
318 */
gotSignalInTerminalStateExceptionnull319 private fun gotSignalInTerminalStateException(context: CoroutineContext, signalName: String) =
320 handleCoroutineException(context,
321 IllegalStateException("'$signalName' was called after the publisher already signalled being in a terminal state"))
322
323 /**
324 * Enforce rule 1.1: it is invalid for a publisher to provide more values than requested.
325 */
326 private fun moreThanOneValueProvidedException(context: CoroutineContext, mode: Mode) =
327 handleCoroutineException(context,
328 IllegalStateException("Only a single value was requested in '$mode', but the publisher provided more"))
329