1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.rx3
6
7 import io.reactivex.rxjava3.core.*
8 import io.reactivex.rxjava3.disposables.Disposable
9 import kotlinx.coroutines.CancellableContinuation
10 import kotlinx.coroutines.CancellationException
11 import kotlinx.coroutines.Job
12 import kotlinx.coroutines.suspendCancellableCoroutine
13 import kotlin.coroutines.*
14
15 // ------------------------ CompletableSource ------------------------
16
17 /**
18 * Awaits for completion of this completable without blocking the thread.
19 * Returns `Unit`, or throws the corresponding exception if this completable produces an error.
20 *
21 * This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this
22 * suspending function is suspended, this function immediately resumes with [CancellationException] and disposes of its
23 * subscription.
24 */
25 public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont ->
26 subscribe(object : CompletableObserver {
27 override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
28 override fun onComplete() { cont.resume(Unit) }
29 override fun onError(e: Throwable) { cont.resumeWithException(e) }
30 })
31 }
32
33 // ------------------------ MaybeSource ------------------------
34
35 /**
36 * Awaits for completion of the [MaybeSource] without blocking the thread.
37 * Returns the resulting value, or `null` if no value is produced, or throws the corresponding exception if this
38 * [MaybeSource] produces an error.
39 *
40 * This suspending function is cancellable.
41 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this
42 * function immediately resumes with [CancellationException] and disposes of its subscription.
43 */
44 @Suppress("UNCHECKED_CAST")
awaitSingleOrNullnull45 public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
46 subscribe(object : MaybeObserver<T> {
47 override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
48 override fun onComplete() { cont.resume(null) }
49 override fun onSuccess(t: T) { cont.resume(t) }
50 override fun onError(error: Throwable) { cont.resumeWithException(error) }
51 })
52 }
53
54 /**
55 * Awaits for completion of the [MaybeSource] without blocking the thread.
56 * Returns the resulting value, or throws if either no value is produced or this [MaybeSource] produces an error.
57 *
58 * This suspending function is cancellable.
59 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this
60 * function immediately resumes with [CancellationException] and disposes of its subscription.
61 *
62 * @throws NoSuchElementException if no elements were produced by this [MaybeSource].
63 */
awaitSinglenull64 public suspend fun <T> MaybeSource<T>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException()
65
66 /**
67 * Awaits for completion of the maybe without blocking a thread.
68 * Returns the resulting value, null if no value was produced or throws the corresponding exception if this
69 * maybe had produced error.
70 *
71 * This suspending function is cancellable.
72 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
73 * immediately resumes with [CancellationException].
74 *
75 * ### Deprecation
76 *
77 * Deprecated in favor of [awaitSingleOrNull] in order to reflect that `null` can be returned to denote the absence of
78 * a value, as opposed to throwing in such case.
79 *
80 * @suppress
81 */
82 @Deprecated(
83 message = "Deprecated in favor of awaitSingleOrNull()",
84 level = DeprecationLevel.ERROR,
85 replaceWith = ReplaceWith("this.awaitSingleOrNull()")
86 ) // Warning since 1.5, error in 1.6, hidden in 1.7
87 public suspend fun <T> MaybeSource<T>.await(): T? = awaitSingleOrNull()
88
89 /**
90 * Awaits for completion of the maybe without blocking a thread.
91 * Returns the resulting value, [default] if no value was produced or throws the corresponding exception if this
92 * maybe had produced error.
93 *
94 * This suspending function is cancellable.
95 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
96 * immediately resumes with [CancellationException].
97 *
98 * ### Deprecation
99 *
100 * Deprecated in favor of [awaitSingleOrNull] for naming consistency (see the deprecation of [MaybeSource.await] for
101 * details).
102 *
103 * @suppress
104 */
105 @Deprecated(
106 message = "Deprecated in favor of awaitSingleOrNull()",
107 level = DeprecationLevel.ERROR,
108 replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default")
109 ) // Warning since 1.5, error in 1.6, hidden in 1.7
110 public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default
111
112 // ------------------------ SingleSource ------------------------
113
114 /**
115 * Awaits for completion of the single value response without blocking the thread.
116 * Returns the resulting value, or throws the corresponding exception if this response produces an error.
117 *
118 * This suspending function is cancellable.
119 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
120 * function immediately disposes of its subscription and resumes with [CancellationException].
121 */
122 public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
123 subscribe(object : SingleObserver<T> {
124 override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
125 override fun onSuccess(t: T) { cont.resume(t) }
126 override fun onError(error: Throwable) { cont.resumeWithException(error) }
127 })
128 }
129
130 // ------------------------ ObservableSource ------------------------
131
132 /**
133 * Awaits the first value from the given [Observable] without blocking the thread and returns the resulting value, or,
134 * if the observable has produced an error, throws the corresponding exception.
135 *
136 * This suspending function is cancellable.
137 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
138 * function immediately disposes of its subscription and resumes with [CancellationException].
139 *
140 * @throws NoSuchElementException if the observable does not emit any value
141 */
awaitFirstnull142 public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
143
144 /**
145 * Awaits the first value from the given [Observable], or returns the [default] value if none is emitted, without
146 * blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the
147 * corresponding exception.
148 *
149 * This suspending function is cancellable.
150 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
151 * function immediately disposes of its subscription and resumes with [CancellationException].
152 */
153 public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
154
155 /**
156 * Awaits the first value from the given [Observable], or returns `null` if none is emitted, without blocking the
157 * thread, and returns the resulting value, or, if this observable has produced an error, throws the corresponding
158 * exception.
159 *
160 * This suspending function is cancellable.
161 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
162 * function immediately disposes of its subscription and resumes with [CancellationException].
163 */
164 public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
165
166 /**
167 * Awaits the first value from the given [Observable], or calls [defaultValue] to get a value if none is emitted,
168 * without blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws
169 * the corresponding exception.
170 *
171 * This suspending function is cancellable.
172 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
173 * function immediately disposes of its subscription and resumes with [CancellationException].
174 */
175 public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T =
176 awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
177
178 /**
179 * Awaits the last value from the given [Observable] without blocking the thread and
180 * returns the resulting value, or, if this observable has produced an error, throws the corresponding exception.
181 *
182 * This suspending function is cancellable.
183 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
184 * function immediately disposes of its subscription and resumes with [CancellationException].
185 *
186 * @throws NoSuchElementException if the observable does not emit any value
187 */
188 public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST)
189
190 /**
191 * Awaits the single value from the given observable without blocking the thread and returns the resulting value, or,
192 * if this observable has produced an error, throws the corresponding exception.
193 *
194 * This suspending function is cancellable.
195 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
196 * function immediately disposes of its subscription and resumes with [CancellationException].
197 *
198 * @throws NoSuchElementException if the observable does not emit any value
199 * @throws IllegalArgumentException if the observable emits more than one value
200 */
201 public suspend fun <T> ObservableSource<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
202
203 // ------------------------ private ------------------------
204
205 internal fun CancellableContinuation<*>.disposeOnCancellation(d: Disposable) =
206 invokeOnCancellation { d.dispose() }
207
208 private enum class Mode(val s: String) {
209 FIRST("awaitFirst"),
210 FIRST_OR_DEFAULT("awaitFirstOrDefault"),
211 LAST("awaitLast"),
212 SINGLE("awaitSingle");
toStringnull213 override fun toString(): String = s
214 }
215
216 private suspend fun <T> ObservableSource<T>.awaitOne(
217 mode: Mode,
218 default: T? = null
219 ): T = suspendCancellableCoroutine { cont ->
220 subscribe(object : Observer<T> {
221 private lateinit var subscription: Disposable
222 private var value: T? = null
223 private var seenValue = false
224
225 override fun onSubscribe(sub: Disposable) {
226 subscription = sub
227 cont.invokeOnCancellation { sub.dispose() }
228 }
229
230 override fun onNext(t: T) {
231 when (mode) {
232 Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
233 if (!seenValue) {
234 seenValue = true
235 cont.resume(t)
236 subscription.dispose()
237 }
238 }
239 Mode.LAST, Mode.SINGLE -> {
240 if (mode == Mode.SINGLE && seenValue) {
241 if (cont.isActive)
242 cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
243 subscription.dispose()
244 } else {
245 value = t
246 seenValue = true
247 }
248 }
249 }
250 }
251
252 @Suppress("UNCHECKED_CAST")
253 override fun onComplete() {
254 if (seenValue) {
255 if (cont.isActive) cont.resume(value as T)
256 return
257 }
258 when {
259 mode == Mode.FIRST_OR_DEFAULT -> {
260 cont.resume(default as T)
261 }
262 cont.isActive -> {
263 cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
264 }
265 }
266 }
267
268 override fun onError(e: Throwable) {
269 cont.resumeWithException(e)
270 }
271 })
272 }
273
274