1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.rx3
6
7 import io.reactivex.rxjava3.core.*
8 import io.reactivex.rxjava3.disposables.Disposable
9 import kotlinx.coroutines.CancellableContinuation
10 import kotlinx.coroutines.CancellationException
11 import kotlinx.coroutines.Job
12 import kotlinx.coroutines.suspendCancellableCoroutine
13 import kotlin.coroutines.*
14
15 // ------------------------ CompletableSource ------------------------
16
17 /**
18 * Awaits for completion of this completable without blocking the thread.
19 * Returns `Unit`, or throws the corresponding exception if this completable produces an error.
20 *
21 * This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this
22 * suspending function is suspended, this function immediately resumes with [CancellationException] and disposes of its
23 * subscription.
24 */
25 public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont ->
26 subscribe(object : CompletableObserver {
27 override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
28 override fun onComplete() { cont.resume(Unit) }
29 override fun onError(e: Throwable) { cont.resumeWithException(e) }
30 })
31 }
32
33 // ------------------------ MaybeSource ------------------------
34
35 /**
36 * Awaits for completion of the [MaybeSource] without blocking the thread.
37 * Returns the resulting value, or `null` if no value is produced, or throws the corresponding exception if this
38 * [MaybeSource] produces an error.
39 *
40 * This suspending function is cancellable.
41 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this
42 * function immediately resumes with [CancellationException] and disposes of its subscription.
43 */
44 public suspend fun <T> MaybeSource<T & Any>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
45 subscribe(object : MaybeObserver<T & Any> {
onSubscribenull46 override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
onCompletenull47 override fun onComplete() { cont.resume(null) }
48 override fun onSuccess(t: T & Any) { cont.resume(t) }
onErrornull49 override fun onError(error: Throwable) { cont.resumeWithException(error) }
50 })
51 }
52
53 /**
54 * Awaits for completion of the [MaybeSource] without blocking the thread.
55 * Returns the resulting value, or throws if either no value is produced or this [MaybeSource] produces an error.
56 *
57 * This suspending function is cancellable.
58 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this
59 * function immediately resumes with [CancellationException] and disposes of its subscription.
60 *
61 * @throws NoSuchElementException if no elements were produced by this [MaybeSource].
62 */
63 public suspend fun <T> MaybeSource<T & Any>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException()
64
65 /**
66 * Awaits for completion of the maybe without blocking a thread.
67 * Returns the resulting value, null if no value was produced or throws the corresponding exception if this
68 * maybe had produced error.
69 *
70 * This suspending function is cancellable.
71 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
72 * immediately resumes with [CancellationException].
73 *
74 * ### Deprecation
75 *
76 * Deprecated in favor of [awaitSingleOrNull] in order to reflect that `null` can be returned to denote the absence of
77 * a value, as opposed to throwing in such case.
78 *
79 * @suppress
80 */
81 @Deprecated(
82 message = "Deprecated in favor of awaitSingleOrNull()",
83 level = DeprecationLevel.HIDDEN,
84 replaceWith = ReplaceWith("this.awaitSingleOrNull()")
85 ) // Warning since 1.5, error in 1.6, hidden in 1.7
86 public suspend fun <T> MaybeSource<T & Any>.await(): T? = awaitSingleOrNull()
87
88 /**
89 * Awaits for completion of the maybe without blocking a thread.
90 * Returns the resulting value, [default] if no value was produced or throws the corresponding exception if this
91 * maybe had produced error.
92 *
93 * This suspending function is cancellable.
94 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
95 * immediately resumes with [CancellationException].
96 *
97 * ### Deprecation
98 *
99 * Deprecated in favor of [awaitSingleOrNull] for naming consistency (see the deprecation of [MaybeSource.await] for
100 * details).
101 *
102 * @suppress
103 */
104 @Deprecated(
105 message = "Deprecated in favor of awaitSingleOrNull()",
106 level = DeprecationLevel.HIDDEN,
107 replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default")
108 ) // Warning since 1.5, error in 1.6, hidden in 1.7
109 public suspend fun <T> MaybeSource<T & Any>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default
110
111 // ------------------------ SingleSource ------------------------
112
113 /**
114 * Awaits for completion of the single value response without blocking the thread.
115 * Returns the resulting value, or throws the corresponding exception if this response produces an error.
116 *
117 * This suspending function is cancellable.
118 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
119 * function immediately disposes of its subscription and resumes with [CancellationException].
120 */
121 public suspend fun <T> SingleSource<T & Any>.await(): T = suspendCancellableCoroutine { cont ->
122 subscribe(object : SingleObserver<T & Any> {
onSubscribenull123 override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
124 override fun onSuccess(t: T & Any) { cont.resume(t) }
onErrornull125 override fun onError(error: Throwable) { cont.resumeWithException(error) }
126 })
127 }
128
129 // ------------------------ ObservableSource ------------------------
130
131 /**
132 * Awaits the first value from the given [Observable] without blocking the thread and returns the resulting value, or,
133 * if the observable has produced an error, throws the corresponding exception.
134 *
135 * This suspending function is cancellable.
136 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
137 * function immediately disposes of its subscription and resumes with [CancellationException].
138 *
139 * @throws NoSuchElementException if the observable does not emit any value
140 */
141 @Suppress("UNCHECKED_CAST")
142 public suspend fun <T> ObservableSource<T & Any>.awaitFirst(): T = awaitOne(Mode.FIRST) as T
143
144 /**
145 * Awaits the first value from the given [Observable], or returns the [default] value if none is emitted, without
146 * blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the
147 * corresponding exception.
148 *
149 * This suspending function is cancellable.
150 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
151 * function immediately disposes of its subscription and resumes with [CancellationException].
152 */
153 @Suppress("UNCHECKED_CAST")
154 public suspend fun <T> ObservableSource<T & Any>.awaitFirstOrDefault(default: T): T =
155 awaitOne(Mode.FIRST_OR_DEFAULT, default) as T
156
157 /**
158 * Awaits the first value from the given [Observable], or returns `null` if none is emitted, without blocking the
159 * thread, and returns the resulting value, or, if this observable has produced an error, throws the corresponding
160 * exception.
161 *
162 * This suspending function is cancellable.
163 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
164 * function immediately disposes of its subscription and resumes with [CancellationException].
165 */
166 public suspend fun <T> ObservableSource<T & Any>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
167
168 /**
169 * Awaits the first value from the given [Observable], or calls [defaultValue] to get a value if none is emitted,
170 * without blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws
171 * the corresponding exception.
172 *
173 * This suspending function is cancellable.
174 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
175 * function immediately disposes of its subscription and resumes with [CancellationException].
176 */
177 public suspend fun <T> ObservableSource<T & Any>.awaitFirstOrElse(defaultValue: () -> T): T =
178 awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
179
180 /**
181 * Awaits the last value from the given [Observable] without blocking the thread and
182 * returns the resulting value, or, if this observable has produced an error, throws the corresponding exception.
183 *
184 * This suspending function is cancellable.
185 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
186 * function immediately disposes of its subscription and resumes with [CancellationException].
187 *
188 * @throws NoSuchElementException if the observable does not emit any value
189 */
190 @Suppress("UNCHECKED_CAST")
191 public suspend fun <T> ObservableSource<T & Any>.awaitLast(): T = awaitOne(Mode.LAST) as T
192
193 /**
194 * Awaits the single value from the given observable without blocking the thread and returns the resulting value, or,
195 * if this observable has produced an error, throws the corresponding exception.
196 *
197 * This suspending function is cancellable.
198 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
199 * function immediately disposes of its subscription and resumes with [CancellationException].
200 *
201 * @throws NoSuchElementException if the observable does not emit any value
202 * @throws IllegalArgumentException if the observable emits more than one value
203 */
204 @Suppress("UNCHECKED_CAST")
205 public suspend fun <T> ObservableSource<T & Any>.awaitSingle(): T = awaitOne(Mode.SINGLE) as T
206
207 // ------------------------ private ------------------------
208
disposeOnCancellationnull209 internal fun CancellableContinuation<*>.disposeOnCancellation(d: Disposable) =
210 invokeOnCancellation { d.dispose() }
211
212 private enum class Mode(@JvmField val s: String) {
213 FIRST("awaitFirst"),
214 FIRST_OR_DEFAULT("awaitFirstOrDefault"),
215 LAST("awaitLast"),
216 SINGLE("awaitSingle");
toStringnull217 override fun toString(): String = s
218 }
219
220 private suspend fun <T> ObservableSource<T & Any>.awaitOne(
221 mode: Mode,
222 default: T? = null
223 ): T? = suspendCancellableCoroutine { cont ->
224 subscribe(object : Observer<T & Any> {
225 private lateinit var subscription: Disposable
226 private var value: T? = null
227 private var seenValue = false
228
229 override fun onSubscribe(sub: Disposable) {
230 subscription = sub
231 cont.invokeOnCancellation { sub.dispose() }
232 }
233
234 override fun onNext(t: T & Any) {
235 when (mode) {
236 Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
237 if (!seenValue) {
238 seenValue = true
239 cont.resume(t)
240 subscription.dispose()
241 }
242 }
243 Mode.LAST, Mode.SINGLE -> {
244 if (mode == Mode.SINGLE && seenValue) {
245 if (cont.isActive)
246 cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
247 subscription.dispose()
248 } else {
249 value = t
250 seenValue = true
251 }
252 }
253 }
254 }
255
256 @Suppress("UNCHECKED_CAST")
257 override fun onComplete() {
258 if (seenValue) {
259 if (cont.isActive) cont.resume(value as T)
260 return
261 }
262 when {
263 mode == Mode.FIRST_OR_DEFAULT -> {
264 cont.resume(default as T)
265 }
266 cont.isActive -> {
267 cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
268 }
269 }
270 }
271
272 override fun onError(e: Throwable) {
273 cont.resumeWithException(e)
274 }
275 })
276 }
277
278