• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactive
6 
7 import kotlinx.coroutines.*
8 import org.reactivestreams.Publisher
9 import org.reactivestreams.Subscriber
10 import org.reactivestreams.Subscription
11 import java.lang.IllegalStateException
12 import kotlin.coroutines.*
13 
14 /**
15  * Awaits the first value from the given publisher without blocking the thread and returns the resulting value, or, if
16  * the publisher has produced an error, throws the corresponding exception.
17  *
18  * This suspending function is cancellable.
19  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
20  * function immediately cancels its [Subscription] and resumes with [CancellationException].
21  *
22  * @throws NoSuchElementException if the publisher does not emit any value
23  */
24 public suspend fun <T> Publisher<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
25 
26 /**
27  * Awaits the first value from the given publisher, or returns the [default] value if none is emitted, without blocking
28  * the thread, and returns the resulting value, or, if this publisher has produced an error, throws the corresponding
29  * exception.
30  *
31  * This suspending function is cancellable.
32  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
33  * function immediately cancels its [Subscription] and resumes with [CancellationException].
34  */
35 public suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
36 
37 /**
38  * Awaits the first value from the given publisher, or returns `null` if none is emitted, without blocking the thread,
39  * and returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
40  *
41  * This suspending function is cancellable.
42  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
43  * function immediately cancels its [Subscription] and resumes with [CancellationException].
44  */
45 public suspend fun <T> Publisher<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
46 
47 /**
48  * Awaits the first value from the given publisher, or calls [defaultValue] to get a value if none is emitted, without
49  * blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the
50  * corresponding exception.
51  *
52  * This suspending function is cancellable.
53  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
54  * function immediately cancels its [Subscription] and resumes with [CancellationException].
55  */
56 public suspend fun <T> Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
57 
58 /**
59  * Awaits the last value from the given publisher without blocking the thread and
60  * returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
61  *
62  * This suspending function is cancellable.
63  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
64  * function immediately cancels its [Subscription] and resumes with [CancellationException].
65  *
66  * @throws NoSuchElementException if the publisher does not emit any value
67  */
68 public suspend fun <T> Publisher<T>.awaitLast(): T = awaitOne(Mode.LAST)
69 
70 /**
71  * Awaits the single value from the given publisher without blocking the thread and returns the resulting value, or,
72  * if this publisher has produced an error, throws the corresponding exception.
73  *
74  * This suspending function is cancellable.
75  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
76  * function immediately cancels its [Subscription] and resumes with [CancellationException].
77  *
78  * @throws NoSuchElementException if the publisher does not emit any value
79  * @throws IllegalArgumentException if the publisher emits more than one value
80  */
81 public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
82 
83 /**
84  * Awaits the single value from the given publisher, or returns the [default] value if none is emitted, without
85  * blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the
86  * corresponding exception.
87  *
88  * This suspending function is cancellable.
89  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
90  * function immediately cancels its [Subscription] and resumes with [CancellationException].
91  *
92  * ### Deprecation
93  *
94  * This method is deprecated because the conventions established in Kotlin mandate that an operation with the name
95  * `awaitSingleOrDefault` returns the default value instead of throwing in case there is an error; however, this would
96  * also mean that this method would return the default value if there are *too many* values. This could be confusing to
97  * those who expect this function to validate that there is a single element or none at all emitted, and cases where
98  * there are no elements are indistinguishable from those where there are too many, though these cases have different
99  * meaning.
100  *
101  * @throws NoSuchElementException if the publisher does not emit any value
102  * @throws IllegalArgumentException if the publisher emits more than one value
103  *
104  * @suppress
105  */
106 @Deprecated(
107     message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " +
108         "Please consider using awaitFirstOrDefault().",
109     level = DeprecationLevel.ERROR
110 ) // Warning since 1.5, error in 1.6, hidden in 1.7
111 public suspend fun <T> Publisher<T>.awaitSingleOrDefault(default: T): T = awaitOne(Mode.SINGLE_OR_DEFAULT, default)
112 
113 /**
114  * Awaits the single value from the given publisher without blocking the thread and returns the resulting value, or, if
115  * this publisher has produced an error, throws the corresponding exception. If more than one value or none were
116  * produced by the publisher, `null` is returned.
117  *
118  * This suspending function is cancellable.
119  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
120  * function immediately cancels its [Subscription] and resumes with [CancellationException].
121  *
122  * ### Deprecation
123  *
124  * This method is deprecated because the conventions established in Kotlin mandate that an operation with the name
125  * `awaitSingleOrNull` returns `null` instead of throwing in case there is an error; however, this would
126  * also mean that this method would return `null` if there are *too many* values. This could be confusing to
127  * those who expect this function to validate that there is a single element or none at all emitted, and cases where
128  * there are no elements are indistinguishable from those where there are too many, though these cases have different
129  * meaning.
130  *
131  * @throws IllegalArgumentException if the publisher emits more than one value
132  * @suppress
133  */
134 @Deprecated(
135     message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " +
136         "There is a specialized version for Reactor's Mono, please use that where applicable. " +
137         "Alternatively, please consider using awaitFirstOrNull().",
138     level = DeprecationLevel.ERROR,
139     replaceWith = ReplaceWith("this.awaitSingleOrNull()", "kotlinx.coroutines.reactor")
140 ) // Warning since 1.5, error in 1.6, hidden in 1.7
141 public suspend fun <T> Publisher<T>.awaitSingleOrNull(): T? = awaitOne(Mode.SINGLE_OR_DEFAULT)
142 
143 /**
144  * Awaits the single value from the given publisher, or calls [defaultValue] to get a value if none is emitted, without
145  * blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the
146  * corresponding exception.
147  *
148  * This suspending function is cancellable.
149  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
150  * function immediately cancels its [Subscription] and resumes with [CancellationException].
151  *
152  * ### Deprecation
153  *
154  * This method is deprecated because the conventions established in Kotlin mandate that an operation with the name
155  * `awaitSingleOrElse` returns the calculated value instead of throwing in case there is an error; however, this would
156  * also mean that this method would return the calculated value if there are *too many* values. This could be confusing
157  * to those who expect this function to validate that there is a single element or none at all emitted, and cases where
158  * there are no elements are indistinguishable from those where there are too many, though these cases have different
159  * meaning.
160  *
161  * @throws IllegalArgumentException if the publisher emits more than one value
162  * @suppress
163  */
164 @Deprecated(
165     message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " +
166         "Please consider using awaitFirstOrElse().",
167     level = DeprecationLevel.ERROR
168 ) // Warning since 1.5, error in 1.6, hidden in 1.7
169 public suspend fun <T> Publisher<T>.awaitSingleOrElse(defaultValue: () -> T): T =
170     awaitOne(Mode.SINGLE_OR_DEFAULT) ?: defaultValue()
171 
172 // ------------------------ private ------------------------
173 
174 private enum class Mode(val s: String) {
175     FIRST("awaitFirst"),
176     FIRST_OR_DEFAULT("awaitFirstOrDefault"),
177     LAST("awaitLast"),
178     SINGLE("awaitSingle"),
179     SINGLE_OR_DEFAULT("awaitSingleOrDefault");
180     override fun toString(): String = s
181 }
182 
awaitOnenull183 private suspend fun <T> Publisher<T>.awaitOne(
184     mode: Mode,
185     default: T? = null
186 ): T = suspendCancellableCoroutine { cont ->
187     /* This implementation must obey
188     https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#2-subscriber-code
189     The numbers of rules are taken from there. */
190     injectCoroutineContext(cont.context).subscribe(object : Subscriber<T> {
191         // It is unclear whether 2.13 implies (T: Any), but if so, it seems that we don't break anything by not adhering
192         private var subscription: Subscription? = null
193         private var value: T? = null
194         private var seenValue = false
195         private var inTerminalState = false
196 
197         override fun onSubscribe(sub: Subscription) {
198             /** cancelling the new subscription due to rule 2.5, though the publisher would either have to
199              * subscribe more than once, which would break 2.12, or leak this [Subscriber]. */
200             if (subscription != null) {
201                 withSubscriptionLock {
202                     sub.cancel()
203                 }
204                 return
205             }
206             subscription = sub
207             cont.invokeOnCancellation {
208                 withSubscriptionLock {
209                     sub.cancel()
210                 }
211             }
212             withSubscriptionLock {
213                 sub.request(if (mode == Mode.FIRST || mode == Mode.FIRST_OR_DEFAULT) 1 else Long.MAX_VALUE)
214             }
215         }
216 
217         override fun onNext(t: T) {
218             val sub = subscription.let {
219                 if (it == null) {
220                     /** Enforce rule 1.9: expect [Subscriber.onSubscribe] before any other signals. */
221                     handleCoroutineException(cont.context,
222                         IllegalStateException("'onNext' was called before 'onSubscribe'"))
223                     return
224                 } else {
225                     it
226                 }
227             }
228             if (inTerminalState) {
229                 gotSignalInTerminalStateException(cont.context, "onNext")
230                 return
231             }
232             when (mode) {
233                 Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
234                     if (seenValue) {
235                         moreThanOneValueProvidedException(cont.context, mode)
236                         return
237                     }
238                     seenValue = true
239                     withSubscriptionLock {
240                         sub.cancel()
241                     }
242                     cont.resume(t)
243                 }
244                 Mode.LAST, Mode.SINGLE, Mode.SINGLE_OR_DEFAULT -> {
245                     if ((mode == Mode.SINGLE || mode == Mode.SINGLE_OR_DEFAULT) && seenValue) {
246                         withSubscriptionLock {
247                             sub.cancel()
248                         }
249                         /* the check for `cont.isActive` is needed in case `sub.cancel() above calls `onComplete` or
250                          `onError` on its own. */
251                         if (cont.isActive) {
252                             cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
253                         }
254                     } else {
255                         value = t
256                         seenValue = true
257                     }
258                 }
259             }
260         }
261 
262         @Suppress("UNCHECKED_CAST")
263         override fun onComplete() {
264             if (!tryEnterTerminalState("onComplete")) {
265                 return
266             }
267             if (seenValue) {
268                 /* the check for `cont.isActive` is needed because, otherwise, if the publisher doesn't acknowledge the
269                 call to `cancel` for modes `SINGLE*` when more than one value was seen, it may call `onComplete`, and
270                 here `cont.resume` would fail. */
271                 if (mode != Mode.FIRST_OR_DEFAULT && mode != Mode.FIRST && cont.isActive) {
272                     cont.resume(value as T)
273                 }
274                 return
275             }
276             when {
277                 (mode == Mode.FIRST_OR_DEFAULT || mode == Mode.SINGLE_OR_DEFAULT) -> {
278                     cont.resume(default as T)
279                 }
280                 cont.isActive -> {
281                     // the check for `cont.isActive` is just a slight optimization and doesn't affect correctness
282                     cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
283                 }
284             }
285         }
286 
287         override fun onError(e: Throwable) {
288             if (tryEnterTerminalState("onError")) {
289                 cont.resumeWithException(e)
290             }
291         }
292 
293         /**
294          * Enforce rule 2.4: assume that the [Publisher] is in a terminal state after [onError] or [onComplete].
295          */
296         private fun tryEnterTerminalState(signalName: String): Boolean {
297             if (inTerminalState) {
298                 gotSignalInTerminalStateException(cont.context, signalName)
299                 return false
300             }
301             inTerminalState = true
302             return true
303         }
304 
305         /**
306          * Enforce rule 2.7: [Subscription.request] and [Subscription.cancel] must be executed serially
307          */
308         @Synchronized
309         private fun withSubscriptionLock(block: () -> Unit) {
310             block()
311         }
312     })
313 }
314 
315 /**
316  * Enforce rule 2.4 (detect publishers that don't respect rule 1.7): don't process anything after a terminal
317  * state was reached.
318  */
gotSignalInTerminalStateExceptionnull319 private fun gotSignalInTerminalStateException(context: CoroutineContext, signalName: String) =
320     handleCoroutineException(context,
321         IllegalStateException("'$signalName' was called after the publisher already signalled being in a terminal state"))
322 
323 /**
324  * Enforce rule 1.1: it is invalid for a publisher to provide more values than requested.
325  */
326 private fun moreThanOneValueProvidedException(context: CoroutineContext, mode: Mode) =
327     handleCoroutineException(context,
328         IllegalStateException("Only a single value was requested in '$mode', but the publisher provided more"))
329