1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.rx2
6
7 import io.reactivex.*
8 import io.reactivex.disposables.Disposable
9 import kotlinx.coroutines.CancellableContinuation
10 import kotlinx.coroutines.CancellationException
11 import kotlinx.coroutines.Job
12 import kotlinx.coroutines.suspendCancellableCoroutine
13 import kotlin.coroutines.*
14
15 // ------------------------ CompletableSource ------------------------
16
17 /**
18 * Awaits for completion of this completable without blocking the thread.
19 * Returns `Unit`, or throws the corresponding exception if this completable produces an error.
20 *
21 * This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this
22 * suspending function is suspended, this function immediately resumes with [CancellationException] and disposes of its
23 * subscription.
24 */
25 public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont ->
26 subscribe(object : CompletableObserver {
27 override fun onSubscribe(d: Disposable) {
28 cont.disposeOnCancellation(d)
29 }
30
31 override fun onComplete() {
32 cont.resume(Unit)
33 }
34
35 override fun onError(e: Throwable) {
36 cont.resumeWithException(e)
37 }
38 })
39 }
40
41 // ------------------------ MaybeSource ------------------------
42
43 /**
44 * Awaits for completion of the [MaybeSource] without blocking the thread.
45 * Returns the resulting value, or `null` if no value is produced, or throws the corresponding exception if this
46 * [MaybeSource] produces an error.
47 *
48 * This suspending function is cancellable.
49 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this
50 * function immediately resumes with [CancellationException] and disposes of its subscription.
51 */
awaitSingleOrNullnull52 public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
53 subscribe(object : MaybeObserver<T> {
54 override fun onSubscribe(d: Disposable) {
55 cont.disposeOnCancellation(d)
56 }
57
58 override fun onComplete() {
59 cont.resume(null)
60 }
61
62 override fun onSuccess(t: T & Any) {
63 cont.resume(t)
64 }
65
66 override fun onError(error: Throwable) {
67 cont.resumeWithException(error)
68 }
69 })
70 }
71
72 /**
73 * Awaits for completion of the [MaybeSource] without blocking the thread.
74 * Returns the resulting value, or throws if either no value is produced or this [MaybeSource] produces an error.
75 *
76 * This suspending function is cancellable.
77 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this
78 * function immediately resumes with [CancellationException] and disposes of its subscription.
79 *
80 * @throws NoSuchElementException if no elements were produced by this [MaybeSource].
81 */
awaitSinglenull82 public suspend fun <T> MaybeSource<T>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException()
83
84 /**
85 * Awaits for completion of the maybe without blocking a thread.
86 * Returns the resulting value, null if no value was produced or throws the corresponding exception if this
87 * maybe had produced error.
88 *
89 * This suspending function is cancellable.
90 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
91 * immediately resumes with [CancellationException].
92 *
93 * ### Deprecation
94 *
95 * Deprecated in favor of [awaitSingleOrNull] in order to reflect that `null` can be returned to denote the absence of
96 * a value, as opposed to throwing in such case.
97 * @suppress
98 */
99 @Deprecated(
100 message = "Deprecated in favor of awaitSingleOrNull()",
101 level = DeprecationLevel.HIDDEN,
102 replaceWith = ReplaceWith("this.awaitSingleOrNull()")
103 ) // Warning since 1.5, error in 1.6, hidden in 1.7
104 public suspend fun <T> MaybeSource<T>.await(): T? = awaitSingleOrNull()
105
106 /**
107 * Awaits for completion of the maybe without blocking a thread.
108 * Returns the resulting value, [default] if no value was produced or throws the corresponding exception if this
109 * maybe had produced error.
110 *
111 * This suspending function is cancellable.
112 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
113 * immediately resumes with [CancellationException].
114 *
115 * ### Deprecation
116 *
117 * Deprecated in favor of [awaitSingleOrNull] for naming consistency (see the deprecation of [MaybeSource.await] for
118 * details).
119 * @suppress
120 */
121 @Deprecated(
122 message = "Deprecated in favor of awaitSingleOrNull()",
123 level = DeprecationLevel.HIDDEN,
124 replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default")
125 ) // Warning since 1.5, error in 1.6, hidden in 1.7
126 public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default
127
128 // ------------------------ SingleSource ------------------------
129
130 /**
131 * Awaits for completion of the single value response without blocking the thread.
132 * Returns the resulting value, or throws the corresponding exception if this response produces an error.
133 *
134 * This suspending function is cancellable.
135 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
136 * function immediately disposes of its subscription and resumes with [CancellationException].
137 */
138 public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
139 subscribe(object : SingleObserver<T> {
140 override fun onSubscribe(d: Disposable) {
141 cont.disposeOnCancellation(d)
142 }
143
144 override fun onSuccess(t: T & Any) {
145 cont.resume(t)
146 }
147
148 override fun onError(error: Throwable) {
149 cont.resumeWithException(error)
150 }
151 })
152 }
153
154 // ------------------------ ObservableSource ------------------------
155
156 /**
157 * Awaits the first value from the given [Observable] without blocking the thread and returns the resulting value, or,
158 * if the observable has produced an error, throws the corresponding exception.
159 *
160 * This suspending function is cancellable.
161 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
162 * function immediately disposes of its subscription and resumes with [CancellationException].
163 *
164 * @throws NoSuchElementException if the observable does not emit any value
165 */
awaitFirstnull166 public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
167
168 /**
169 * Awaits the first value from the given [Observable], or returns the [default] value if none is emitted, without
170 * blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the
171 * corresponding exception.
172 *
173 * This suspending function is cancellable.
174 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
175 * function immediately disposes of its subscription and resumes with [CancellationException].
176 */
177 public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
178
179 /**
180 * Awaits the first value from the given [Observable], or returns `null` if none is emitted, without blocking the
181 * thread, and returns the resulting value, or, if this observable has produced an error, throws the corresponding
182 * exception.
183 *
184 * This suspending function is cancellable.
185 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
186 * function immediately disposes of its subscription and resumes with [CancellationException].
187 */
188 public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
189
190 /**
191 * Awaits the first value from the given [Observable], or calls [defaultValue] to get a value if none is emitted,
192 * without blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws
193 * the corresponding exception.
194 *
195 * This suspending function is cancellable.
196 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
197 * function immediately disposes of its subscription and resumes with [CancellationException].
198 */
199 public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T =
200 awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
201
202 /**
203 * Awaits the last value from the given [Observable] without blocking the thread and
204 * returns the resulting value, or, if this observable has produced an error, throws the corresponding exception.
205 *
206 * This suspending function is cancellable.
207 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
208 * function immediately disposes of its subscription and resumes with [CancellationException].
209 *
210 * @throws NoSuchElementException if the observable does not emit any value
211 */
212 public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST)
213
214 /**
215 * Awaits the single value from the given observable without blocking the thread and returns the resulting value, or,
216 * if this observable has produced an error, throws the corresponding exception.
217 *
218 * This suspending function is cancellable.
219 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
220 * function immediately disposes of its subscription and resumes with [CancellationException].
221 *
222 * @throws NoSuchElementException if the observable does not emit any value
223 * @throws IllegalArgumentException if the observable emits more than one value
224 */
225 public suspend fun <T> ObservableSource<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
226
227 // ------------------------ private ------------------------
228
229 internal fun CancellableContinuation<*>.disposeOnCancellation(d: Disposable) =
230 invokeOnCancellation { d.dispose() }
231
232 private enum class Mode(val s: String) {
233 FIRST("awaitFirst"),
234 FIRST_OR_DEFAULT("awaitFirstOrDefault"),
235 LAST("awaitLast"),
236 SINGLE("awaitSingle");
toStringnull237 override fun toString(): String = s
238 }
239
240 private suspend fun <T> ObservableSource<T>.awaitOne(
241 mode: Mode,
242 default: T? = null
243 ): T = suspendCancellableCoroutine { cont ->
244 subscribe(object : Observer<T> {
245 private lateinit var subscription: Disposable
246 private var value: T? = null
247 private var seenValue = false
248
249 override fun onSubscribe(sub: Disposable) {
250 subscription = sub
251 cont.invokeOnCancellation { sub.dispose() }
252 }
253
254 override fun onNext(t: T & Any) {
255 when (mode) {
256 Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
257 if (!seenValue) {
258 seenValue = true
259 cont.resume(t)
260 subscription.dispose()
261 }
262 }
263 Mode.LAST, Mode.SINGLE -> {
264 if (mode == Mode.SINGLE && seenValue) {
265 if (cont.isActive)
266 cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
267 subscription.dispose()
268 } else {
269 value = t
270 seenValue = true
271 }
272 }
273 }
274 }
275
276 @Suppress("UNCHECKED_CAST")
277 override fun onComplete() {
278 if (seenValue) {
279 if (cont.isActive) cont.resume(value as T)
280 return
281 }
282 when {
283 mode == Mode.FIRST_OR_DEFAULT -> {
284 cont.resume(default as T)
285 }
286 cont.isActive -> {
287 cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
288 }
289 }
290 }
291
292 override fun onError(e: Throwable) {
293 cont.resumeWithException(e)
294 }
295 })
296 }
297
298