<lambda>null1 package kotlinx.coroutines.reactive
2
3 import kotlinx.coroutines.*
4 import org.reactivestreams.Publisher
5 import org.reactivestreams.Subscriber
6 import org.reactivestreams.Subscription
7 import java.lang.IllegalStateException
8 import kotlin.coroutines.*
9
10 /**
11 * Awaits the first value from the given publisher without blocking the thread and returns the resulting value, or, if
12 * the publisher has produced an error, throws the corresponding exception.
13 *
14 * This suspending function is cancellable.
15 * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
16 * function immediately cancels its [Subscription] and resumes with [CancellationException].
17 *
18 * @throws NoSuchElementException if the publisher does not emit any value
19 */
20 public suspend fun <T> Publisher<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
21
22 /**
23 * Awaits the first value from the given publisher, or returns the [default] value if none is emitted, without blocking
24 * the thread, and returns the resulting value, or, if this publisher has produced an error, throws the corresponding
25 * exception.
26 *
27 * This suspending function is cancellable.
28 * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
29 * function immediately cancels its [Subscription] and resumes with [CancellationException].
30 */
31 public suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
32
33 /**
34 * Awaits the first value from the given publisher, or returns `null` if none is emitted, without blocking the thread,
35 * and returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
36 *
37 * This suspending function is cancellable.
38 * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
39 * function immediately cancels its [Subscription] and resumes with [CancellationException].
40 */
41 public suspend fun <T> Publisher<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
42
43 /**
44 * Awaits the first value from the given publisher, or calls [defaultValue] to get a value if none is emitted, without
45 * blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the
46 * corresponding exception.
47 *
48 * This suspending function is cancellable.
49 * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
50 * function immediately cancels its [Subscription] and resumes with [CancellationException].
51 */
52 public suspend fun <T> Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
53
54 /**
55 * Awaits the last value from the given publisher without blocking the thread and
56 * returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
57 *
58 * This suspending function is cancellable.
59 * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
60 * function immediately cancels its [Subscription] and resumes with [CancellationException].
61 *
62 * @throws NoSuchElementException if the publisher does not emit any value
63 */
64 public suspend fun <T> Publisher<T>.awaitLast(): T = awaitOne(Mode.LAST)
65
66 /**
67 * Awaits the single value from the given publisher without blocking the thread and returns the resulting value, or,
68 * if this publisher has produced an error, throws the corresponding exception.
69 *
70 * This suspending function is cancellable.
71 * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
72 * function immediately cancels its [Subscription] and resumes with [CancellationException].
73 *
74 * @throws NoSuchElementException if the publisher does not emit any value
75 * @throws IllegalArgumentException if the publisher emits more than one value
76 */
77 public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
78
79 /**
80 * Awaits the single value from the given publisher, or returns the [default] value if none is emitted, without
81 * blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the
82 * corresponding exception.
83 *
84 * This suspending function is cancellable.
85 * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
86 * function immediately cancels its [Subscription] and resumes with [CancellationException].
87 *
88 * ### Deprecation
89 *
90 * This method is deprecated because the conventions established in Kotlin mandate that an operation with the name
91 * `awaitSingleOrDefault` returns the default value instead of throwing in case there is an error; however, this would
92 * also mean that this method would return the default value if there are *too many* values. This could be confusing to
93 * those who expect this function to validate that there is a single element or none at all emitted, and cases where
94 * there are no elements are indistinguishable from those where there are too many, though these cases have different
95 * meaning.
96 *
97 * @throws NoSuchElementException if the publisher does not emit any value
98 * @throws IllegalArgumentException if the publisher emits more than one value
99 *
100 * @suppress
101 */
102 @Deprecated(
103 message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " +
104 "Please consider using awaitFirstOrDefault().",
105 level = DeprecationLevel.HIDDEN
106 ) // Warning since 1.5, error in 1.6, hidden in 1.7
107 public suspend fun <T> Publisher<T>.awaitSingleOrDefault(default: T): T = awaitOne(Mode.SINGLE_OR_DEFAULT, default)
108
109 /**
110 * Awaits the single value from the given publisher without blocking the thread and returns the resulting value, or, if
111 * this publisher has produced an error, throws the corresponding exception. If more than one value or none were
112 * produced by the publisher, `null` is returned.
113 *
114 * This suspending function is cancellable.
115 * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
116 * function immediately cancels its [Subscription] and resumes with [CancellationException].
117 *
118 * ### Deprecation
119 *
120 * This method is deprecated because the conventions established in Kotlin mandate that an operation with the name
121 * `awaitSingleOrNull` returns `null` instead of throwing in case there is an error; however, this would
122 * also mean that this method would return `null` if there are *too many* values. This could be confusing to
123 * those who expect this function to validate that there is a single element or none at all emitted, and cases where
124 * there are no elements are indistinguishable from those where there are too many, though these cases have different
125 * meaning.
126 *
127 * @throws IllegalArgumentException if the publisher emits more than one value
128 * @suppress
129 */
130 @Deprecated(
131 message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " +
132 "There is a specialized version for Reactor's Mono, please use that where applicable. " +
133 "Alternatively, please consider using awaitFirstOrNull().",
134 level = DeprecationLevel.HIDDEN,
135 replaceWith = ReplaceWith("this.awaitSingleOrNull()", "kotlinx.coroutines.reactor")
136 ) // Warning since 1.5, error in 1.6, hidden in 1.7
137 public suspend fun <T> Publisher<T>.awaitSingleOrNull(): T? = awaitOne(Mode.SINGLE_OR_DEFAULT)
138
139 /**
140 * Awaits the single value from the given publisher, or calls [defaultValue] to get a value if none is emitted, without
141 * blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the
142 * corresponding exception.
143 *
144 * This suspending function is cancellable.
145 * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
146 * function immediately cancels its [Subscription] and resumes with [CancellationException].
147 *
148 * ### Deprecation
149 *
150 * This method is deprecated because the conventions established in Kotlin mandate that an operation with the name
151 * `awaitSingleOrElse` returns the calculated value instead of throwing in case there is an error; however, this would
152 * also mean that this method would return the calculated value if there are *too many* values. This could be confusing
153 * to those who expect this function to validate that there is a single element or none at all emitted, and cases where
154 * there are no elements are indistinguishable from those where there are too many, though these cases have different
155 * meaning.
156 *
157 * @throws IllegalArgumentException if the publisher emits more than one value
158 * @suppress
159 */
160 @Deprecated(
161 message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " +
162 "Please consider using awaitFirstOrElse().",
163 level = DeprecationLevel.HIDDEN
164 ) // Warning since 1.5, error in 1.6, hidden in 1.7
165 public suspend fun <T> Publisher<T>.awaitSingleOrElse(defaultValue: () -> T): T =
166 awaitOne(Mode.SINGLE_OR_DEFAULT) ?: defaultValue()
167
168 // ------------------------ private ------------------------
169
170 private enum class Mode(val s: String) {
171 FIRST("awaitFirst"),
172 FIRST_OR_DEFAULT("awaitFirstOrDefault"),
173 LAST("awaitLast"),
174 SINGLE("awaitSingle"),
175 SINGLE_OR_DEFAULT("awaitSingleOrDefault");
176 override fun toString(): String = s
177 }
178
awaitOnenull179 private suspend fun <T> Publisher<T>.awaitOne(
180 mode: Mode,
181 default: T? = null
182 ): T = suspendCancellableCoroutine { cont ->
183 /* This implementation must obey
184 https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#2-subscriber-code
185 The numbers of rules are taken from there. */
186 injectCoroutineContext(cont.context).subscribe(object : Subscriber<T> {
187 // It is unclear whether 2.13 implies (T: Any), but if so, it seems that we don't break anything by not adhering
188 private var subscription: Subscription? = null
189 private var value: T? = null
190 private var seenValue = false
191 private var inTerminalState = false
192
193 override fun onSubscribe(sub: Subscription) {
194 /** cancelling the new subscription due to rule 2.5, though the publisher would either have to
195 * subscribe more than once, which would break 2.12, or leak this [Subscriber]. */
196 if (subscription != null) {
197 withSubscriptionLock {
198 sub.cancel()
199 }
200 return
201 }
202 subscription = sub
203 cont.invokeOnCancellation {
204 withSubscriptionLock {
205 sub.cancel()
206 }
207 }
208 withSubscriptionLock {
209 sub.request(if (mode == Mode.FIRST || mode == Mode.FIRST_OR_DEFAULT) 1 else Long.MAX_VALUE)
210 }
211 }
212
213 override fun onNext(t: T) {
214 val sub = subscription.let {
215 if (it == null) {
216 /** Enforce rule 1.9: expect [Subscriber.onSubscribe] before any other signals. */
217 handleCoroutineException(cont.context,
218 IllegalStateException("'onNext' was called before 'onSubscribe'"))
219 return
220 } else {
221 it
222 }
223 }
224 if (inTerminalState) {
225 gotSignalInTerminalStateException(cont.context, "onNext")
226 return
227 }
228 when (mode) {
229 Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
230 if (seenValue) {
231 moreThanOneValueProvidedException(cont.context, mode)
232 return
233 }
234 seenValue = true
235 withSubscriptionLock {
236 sub.cancel()
237 }
238 cont.resume(t)
239 }
240 Mode.LAST, Mode.SINGLE, Mode.SINGLE_OR_DEFAULT -> {
241 if ((mode == Mode.SINGLE || mode == Mode.SINGLE_OR_DEFAULT) && seenValue) {
242 withSubscriptionLock {
243 sub.cancel()
244 }
245 /* the check for `cont.isActive` is needed in case `sub.cancel() above calls `onComplete` or
246 `onError` on its own. */
247 if (cont.isActive) {
248 cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
249 }
250 } else {
251 value = t
252 seenValue = true
253 }
254 }
255 }
256 }
257
258 @Suppress("UNCHECKED_CAST")
259 override fun onComplete() {
260 if (!tryEnterTerminalState("onComplete")) {
261 return
262 }
263 if (seenValue) {
264 /* the check for `cont.isActive` is needed because, otherwise, if the publisher doesn't acknowledge the
265 call to `cancel` for modes `SINGLE*` when more than one value was seen, it may call `onComplete`, and
266 here `cont.resume` would fail. */
267 if (mode != Mode.FIRST_OR_DEFAULT && mode != Mode.FIRST && cont.isActive) {
268 cont.resume(value as T)
269 }
270 return
271 }
272 when {
273 (mode == Mode.FIRST_OR_DEFAULT || mode == Mode.SINGLE_OR_DEFAULT) -> {
274 cont.resume(default as T)
275 }
276 cont.isActive -> {
277 // the check for `cont.isActive` is just a slight optimization and doesn't affect correctness
278 cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
279 }
280 }
281 }
282
283 override fun onError(e: Throwable) {
284 if (tryEnterTerminalState("onError")) {
285 cont.resumeWithException(e)
286 }
287 }
288
289 /**
290 * Enforce rule 2.4: assume that the [Publisher] is in a terminal state after [onError] or [onComplete].
291 */
292 private fun tryEnterTerminalState(signalName: String): Boolean {
293 if (inTerminalState) {
294 gotSignalInTerminalStateException(cont.context, signalName)
295 return false
296 }
297 inTerminalState = true
298 return true
299 }
300
301 /**
302 * Enforce rule 2.7: [Subscription.request] and [Subscription.cancel] must be executed serially
303 */
304 @Synchronized
305 private fun withSubscriptionLock(block: () -> Unit) {
306 block()
307 }
308 })
309 }
310
311 /**
312 * Enforce rule 2.4 (detect publishers that don't respect rule 1.7): don't process anything after a terminal
313 * state was reached.
314 */
gotSignalInTerminalStateExceptionnull315 private fun gotSignalInTerminalStateException(context: CoroutineContext, signalName: String) =
316 handleCoroutineException(context,
317 IllegalStateException("'$signalName' was called after the publisher already signalled being in a terminal state"))
318
319 /**
320 * Enforce rule 1.1: it is invalid for a publisher to provide more values than requested.
321 */
322 private fun moreThanOneValueProvidedException(context: CoroutineContext, mode: Mode) =
323 handleCoroutineException(context,
324 IllegalStateException("Only a single value was requested in '$mode', but the publisher provided more"))
325