• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.reactive
2 
3 import kotlinx.coroutines.*
4 import org.reactivestreams.Publisher
5 import org.reactivestreams.Subscriber
6 import org.reactivestreams.Subscription
7 import java.lang.IllegalStateException
8 import kotlin.coroutines.*
9 
10 /**
11  * Awaits the first value from the given publisher without blocking the thread and returns the resulting value, or, if
12  * the publisher has produced an error, throws the corresponding exception.
13  *
14  * This suspending function is cancellable.
15  * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
16  * function immediately cancels its [Subscription] and resumes with [CancellationException].
17  *
18  * @throws NoSuchElementException if the publisher does not emit any value
19  */
20 public suspend fun <T> Publisher<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
21 
22 /**
23  * Awaits the first value from the given publisher, or returns the [default] value if none is emitted, without blocking
24  * the thread, and returns the resulting value, or, if this publisher has produced an error, throws the corresponding
25  * exception.
26  *
27  * This suspending function is cancellable.
28  * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
29  * function immediately cancels its [Subscription] and resumes with [CancellationException].
30  */
31 public suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
32 
33 /**
34  * Awaits the first value from the given publisher, or returns `null` if none is emitted, without blocking the thread,
35  * and returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
36  *
37  * This suspending function is cancellable.
38  * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
39  * function immediately cancels its [Subscription] and resumes with [CancellationException].
40  */
41 public suspend fun <T> Publisher<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
42 
43 /**
44  * Awaits the first value from the given publisher, or calls [defaultValue] to get a value if none is emitted, without
45  * blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the
46  * corresponding exception.
47  *
48  * This suspending function is cancellable.
49  * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
50  * function immediately cancels its [Subscription] and resumes with [CancellationException].
51  */
52 public suspend fun <T> Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
53 
54 /**
55  * Awaits the last value from the given publisher without blocking the thread and
56  * returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
57  *
58  * This suspending function is cancellable.
59  * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
60  * function immediately cancels its [Subscription] and resumes with [CancellationException].
61  *
62  * @throws NoSuchElementException if the publisher does not emit any value
63  */
64 public suspend fun <T> Publisher<T>.awaitLast(): T = awaitOne(Mode.LAST)
65 
66 /**
67  * Awaits the single value from the given publisher without blocking the thread and returns the resulting value, or,
68  * if this publisher has produced an error, throws the corresponding exception.
69  *
70  * This suspending function is cancellable.
71  * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
72  * function immediately cancels its [Subscription] and resumes with [CancellationException].
73  *
74  * @throws NoSuchElementException if the publisher does not emit any value
75  * @throws IllegalArgumentException if the publisher emits more than one value
76  */
77 public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
78 
79 /**
80  * Awaits the single value from the given publisher, or returns the [default] value if none is emitted, without
81  * blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the
82  * corresponding exception.
83  *
84  * This suspending function is cancellable.
85  * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
86  * function immediately cancels its [Subscription] and resumes with [CancellationException].
87  *
88  * ### Deprecation
89  *
90  * This method is deprecated because the conventions established in Kotlin mandate that an operation with the name
91  * `awaitSingleOrDefault` returns the default value instead of throwing in case there is an error; however, this would
92  * also mean that this method would return the default value if there are *too many* values. This could be confusing to
93  * those who expect this function to validate that there is a single element or none at all emitted, and cases where
94  * there are no elements are indistinguishable from those where there are too many, though these cases have different
95  * meaning.
96  *
97  * @throws NoSuchElementException if the publisher does not emit any value
98  * @throws IllegalArgumentException if the publisher emits more than one value
99  *
100  * @suppress
101  */
102 @Deprecated(
103     message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " +
104         "Please consider using awaitFirstOrDefault().",
105     level = DeprecationLevel.HIDDEN
106 ) // Warning since 1.5, error in 1.6, hidden in 1.7
107 public suspend fun <T> Publisher<T>.awaitSingleOrDefault(default: T): T = awaitOne(Mode.SINGLE_OR_DEFAULT, default)
108 
109 /**
110  * Awaits the single value from the given publisher without blocking the thread and returns the resulting value, or, if
111  * this publisher has produced an error, throws the corresponding exception. If more than one value or none were
112  * produced by the publisher, `null` is returned.
113  *
114  * This suspending function is cancellable.
115  * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
116  * function immediately cancels its [Subscription] and resumes with [CancellationException].
117  *
118  * ### Deprecation
119  *
120  * This method is deprecated because the conventions established in Kotlin mandate that an operation with the name
121  * `awaitSingleOrNull` returns `null` instead of throwing in case there is an error; however, this would
122  * also mean that this method would return `null` if there are *too many* values. This could be confusing to
123  * those who expect this function to validate that there is a single element or none at all emitted, and cases where
124  * there are no elements are indistinguishable from those where there are too many, though these cases have different
125  * meaning.
126  *
127  * @throws IllegalArgumentException if the publisher emits more than one value
128  * @suppress
129  */
130 @Deprecated(
131     message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " +
132         "There is a specialized version for Reactor's Mono, please use that where applicable. " +
133         "Alternatively, please consider using awaitFirstOrNull().",
134     level = DeprecationLevel.HIDDEN,
135     replaceWith = ReplaceWith("this.awaitSingleOrNull()", "kotlinx.coroutines.reactor")
136 ) // Warning since 1.5, error in 1.6, hidden in 1.7
137 public suspend fun <T> Publisher<T>.awaitSingleOrNull(): T? = awaitOne(Mode.SINGLE_OR_DEFAULT)
138 
139 /**
140  * Awaits the single value from the given publisher, or calls [defaultValue] to get a value if none is emitted, without
141  * blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the
142  * corresponding exception.
143  *
144  * This suspending function is cancellable.
145  * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
146  * function immediately cancels its [Subscription] and resumes with [CancellationException].
147  *
148  * ### Deprecation
149  *
150  * This method is deprecated because the conventions established in Kotlin mandate that an operation with the name
151  * `awaitSingleOrElse` returns the calculated value instead of throwing in case there is an error; however, this would
152  * also mean that this method would return the calculated value if there are *too many* values. This could be confusing
153  * to those who expect this function to validate that there is a single element or none at all emitted, and cases where
154  * there are no elements are indistinguishable from those where there are too many, though these cases have different
155  * meaning.
156  *
157  * @throws IllegalArgumentException if the publisher emits more than one value
158  * @suppress
159  */
160 @Deprecated(
161     message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " +
162         "Please consider using awaitFirstOrElse().",
163     level = DeprecationLevel.HIDDEN
164 ) // Warning since 1.5, error in 1.6, hidden in 1.7
165 public suspend fun <T> Publisher<T>.awaitSingleOrElse(defaultValue: () -> T): T =
166     awaitOne(Mode.SINGLE_OR_DEFAULT) ?: defaultValue()
167 
168 // ------------------------ private ------------------------
169 
170 private enum class Mode(val s: String) {
171     FIRST("awaitFirst"),
172     FIRST_OR_DEFAULT("awaitFirstOrDefault"),
173     LAST("awaitLast"),
174     SINGLE("awaitSingle"),
175     SINGLE_OR_DEFAULT("awaitSingleOrDefault");
176     override fun toString(): String = s
177 }
178 
awaitOnenull179 private suspend fun <T> Publisher<T>.awaitOne(
180     mode: Mode,
181     default: T? = null
182 ): T = suspendCancellableCoroutine { cont ->
183     /* This implementation must obey
184     https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#2-subscriber-code
185     The numbers of rules are taken from there. */
186     injectCoroutineContext(cont.context).subscribe(object : Subscriber<T> {
187         // It is unclear whether 2.13 implies (T: Any), but if so, it seems that we don't break anything by not adhering
188         private var subscription: Subscription? = null
189         private var value: T? = null
190         private var seenValue = false
191         private var inTerminalState = false
192 
193         override fun onSubscribe(sub: Subscription) {
194             /** cancelling the new subscription due to rule 2.5, though the publisher would either have to
195              * subscribe more than once, which would break 2.12, or leak this [Subscriber]. */
196             if (subscription != null) {
197                 withSubscriptionLock {
198                     sub.cancel()
199                 }
200                 return
201             }
202             subscription = sub
203             cont.invokeOnCancellation {
204                 withSubscriptionLock {
205                     sub.cancel()
206                 }
207             }
208             withSubscriptionLock {
209                 sub.request(if (mode == Mode.FIRST || mode == Mode.FIRST_OR_DEFAULT) 1 else Long.MAX_VALUE)
210             }
211         }
212 
213         override fun onNext(t: T) {
214             val sub = subscription.let {
215                 if (it == null) {
216                     /** Enforce rule 1.9: expect [Subscriber.onSubscribe] before any other signals. */
217                     handleCoroutineException(cont.context,
218                         IllegalStateException("'onNext' was called before 'onSubscribe'"))
219                     return
220                 } else {
221                     it
222                 }
223             }
224             if (inTerminalState) {
225                 gotSignalInTerminalStateException(cont.context, "onNext")
226                 return
227             }
228             when (mode) {
229                 Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
230                     if (seenValue) {
231                         moreThanOneValueProvidedException(cont.context, mode)
232                         return
233                     }
234                     seenValue = true
235                     withSubscriptionLock {
236                         sub.cancel()
237                     }
238                     cont.resume(t)
239                 }
240                 Mode.LAST, Mode.SINGLE, Mode.SINGLE_OR_DEFAULT -> {
241                     if ((mode == Mode.SINGLE || mode == Mode.SINGLE_OR_DEFAULT) && seenValue) {
242                         withSubscriptionLock {
243                             sub.cancel()
244                         }
245                         /* the check for `cont.isActive` is needed in case `sub.cancel() above calls `onComplete` or
246                          `onError` on its own. */
247                         if (cont.isActive) {
248                             cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
249                         }
250                     } else {
251                         value = t
252                         seenValue = true
253                     }
254                 }
255             }
256         }
257 
258         @Suppress("UNCHECKED_CAST")
259         override fun onComplete() {
260             if (!tryEnterTerminalState("onComplete")) {
261                 return
262             }
263             if (seenValue) {
264                 /* the check for `cont.isActive` is needed because, otherwise, if the publisher doesn't acknowledge the
265                 call to `cancel` for modes `SINGLE*` when more than one value was seen, it may call `onComplete`, and
266                 here `cont.resume` would fail. */
267                 if (mode != Mode.FIRST_OR_DEFAULT && mode != Mode.FIRST && cont.isActive) {
268                     cont.resume(value as T)
269                 }
270                 return
271             }
272             when {
273                 (mode == Mode.FIRST_OR_DEFAULT || mode == Mode.SINGLE_OR_DEFAULT) -> {
274                     cont.resume(default as T)
275                 }
276                 cont.isActive -> {
277                     // the check for `cont.isActive` is just a slight optimization and doesn't affect correctness
278                     cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
279                 }
280             }
281         }
282 
283         override fun onError(e: Throwable) {
284             if (tryEnterTerminalState("onError")) {
285                 cont.resumeWithException(e)
286             }
287         }
288 
289         /**
290          * Enforce rule 2.4: assume that the [Publisher] is in a terminal state after [onError] or [onComplete].
291          */
292         private fun tryEnterTerminalState(signalName: String): Boolean {
293             if (inTerminalState) {
294                 gotSignalInTerminalStateException(cont.context, signalName)
295                 return false
296             }
297             inTerminalState = true
298             return true
299         }
300 
301         /**
302          * Enforce rule 2.7: [Subscription.request] and [Subscription.cancel] must be executed serially
303          */
304         @Synchronized
305         private fun withSubscriptionLock(block: () -> Unit) {
306             block()
307         }
308     })
309 }
310 
311 /**
312  * Enforce rule 2.4 (detect publishers that don't respect rule 1.7): don't process anything after a terminal
313  * state was reached.
314  */
gotSignalInTerminalStateExceptionnull315 private fun gotSignalInTerminalStateException(context: CoroutineContext, signalName: String) =
316     handleCoroutineException(context,
317         IllegalStateException("'$signalName' was called after the publisher already signalled being in a terminal state"))
318 
319 /**
320  * Enforce rule 1.1: it is invalid for a publisher to provide more values than requested.
321  */
322 private fun moreThanOneValueProvidedException(context: CoroutineContext, mode: Mode) =
323     handleCoroutineException(context,
324         IllegalStateException("Only a single value was requested in '$mode', but the publisher provided more"))
325