<lambda>null1 package kotlinx.coroutines.rx3
2
3 import io.reactivex.rxjava3.core.*
4 import io.reactivex.rxjava3.disposables.Disposable
5 import kotlinx.coroutines.CancellableContinuation
6 import kotlinx.coroutines.CancellationException
7 import kotlinx.coroutines.Job
8 import kotlinx.coroutines.suspendCancellableCoroutine
9 import kotlin.coroutines.*
10
11 // ------------------------ CompletableSource ------------------------
12
13 /**
14 * Awaits for completion of this completable without blocking the thread.
15 * Returns `Unit`, or throws the corresponding exception if this completable produces an error.
16 *
17 * This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled while this
18 * suspending function is suspended, this function immediately resumes with [CancellationException] and disposes of its
19 * subscription.
20 */
21 public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont ->
22 subscribe(object : CompletableObserver {
23 override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
24 override fun onComplete() { cont.resume(Unit) }
25 override fun onError(e: Throwable) { cont.resumeWithException(e) }
26 })
27 }
28
29 // ------------------------ MaybeSource ------------------------
30
31 /**
32 * Awaits for completion of the [MaybeSource] without blocking the thread.
33 * Returns the resulting value, or `null` if no value is produced, or throws the corresponding exception if this
34 * [MaybeSource] produces an error.
35 *
36 * This suspending function is cancellable.
37 * If the [Job] of the current coroutine is cancelled while this suspending function is waiting, this
38 * function immediately resumes with [CancellationException] and disposes of its subscription.
39 */
40 public suspend fun <T> MaybeSource<T & Any>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
41 subscribe(object : MaybeObserver<T & Any> {
onSubscribenull42 override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
onCompletenull43 override fun onComplete() { cont.resume(null) }
44 override fun onSuccess(t: T & Any) { cont.resume(t) }
onErrornull45 override fun onError(error: Throwable) { cont.resumeWithException(error) }
46 })
47 }
48
49 /**
50 * Awaits for completion of the [MaybeSource] without blocking the thread.
51 * Returns the resulting value, or throws if either no value is produced or this [MaybeSource] produces an error.
52 *
53 * This suspending function is cancellable.
54 * If the [Job] of the current coroutine is cancelled while this suspending function is waiting, this
55 * function immediately resumes with [CancellationException] and disposes of its subscription.
56 *
57 * @throws NoSuchElementException if no elements were produced by this [MaybeSource].
58 */
59 public suspend fun <T> MaybeSource<T & Any>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException()
60
61 /**
62 * Awaits for completion of the maybe without blocking a thread.
63 * Returns the resulting value, null if no value was produced or throws the corresponding exception if this
64 * maybe had produced error.
65 *
66 * This suspending function is cancellable.
67 * If the [Job] of the current coroutine is cancelled while this suspending function is waiting, this function
68 * immediately resumes with [CancellationException].
69 *
70 * ### Deprecation
71 *
72 * Deprecated in favor of [awaitSingleOrNull] in order to reflect that `null` can be returned to denote the absence of
73 * a value, as opposed to throwing in such case.
74 *
75 * @suppress
76 */
77 @Deprecated(
78 message = "Deprecated in favor of awaitSingleOrNull()",
79 level = DeprecationLevel.HIDDEN,
80 replaceWith = ReplaceWith("this.awaitSingleOrNull()")
81 ) // Warning since 1.5, error in 1.6, hidden in 1.7
82 public suspend fun <T> MaybeSource<T & Any>.await(): T? = awaitSingleOrNull()
83
84 /**
85 * Awaits for completion of the maybe without blocking a thread.
86 * Returns the resulting value, [default] if no value was produced or throws the corresponding exception if this
87 * maybe had produced error.
88 *
89 * This suspending function is cancellable.
90 * If the [Job] of the current coroutine is cancelled while this suspending function is waiting, this function
91 * immediately resumes with [CancellationException].
92 *
93 * ### Deprecation
94 *
95 * Deprecated in favor of [awaitSingleOrNull] for naming consistency (see the deprecation of [MaybeSource.await] for
96 * details).
97 *
98 * @suppress
99 */
100 @Deprecated(
101 message = "Deprecated in favor of awaitSingleOrNull()",
102 level = DeprecationLevel.HIDDEN,
103 replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default")
104 ) // Warning since 1.5, error in 1.6, hidden in 1.7
105 public suspend fun <T> MaybeSource<T & Any>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default
106
107 // ------------------------ SingleSource ------------------------
108
109 /**
110 * Awaits for completion of the single value response without blocking the thread.
111 * Returns the resulting value, or throws the corresponding exception if this response produces an error.
112 *
113 * This suspending function is cancellable.
114 * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
115 * function immediately disposes of its subscription and resumes with [CancellationException].
116 */
117 public suspend fun <T> SingleSource<T & Any>.await(): T = suspendCancellableCoroutine { cont ->
118 subscribe(object : SingleObserver<T & Any> {
onSubscribenull119 override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
120 override fun onSuccess(t: T & Any) { cont.resume(t) }
onErrornull121 override fun onError(error: Throwable) { cont.resumeWithException(error) }
122 })
123 }
124
125 // ------------------------ ObservableSource ------------------------
126
127 /**
128 * Awaits the first value from the given [Observable] without blocking the thread and returns the resulting value, or,
129 * if the observable has produced an error, throws the corresponding exception.
130 *
131 * This suspending function is cancellable.
132 * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
133 * function immediately disposes of its subscription and resumes with [CancellationException].
134 *
135 * @throws NoSuchElementException if the observable does not emit any value
136 */
137 @Suppress("UNCHECKED_CAST")
138 public suspend fun <T> ObservableSource<T & Any>.awaitFirst(): T = awaitOne(Mode.FIRST) as T
139
140 /**
141 * Awaits the first value from the given [Observable], or returns the [default] value if none is emitted, without
142 * blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the
143 * corresponding exception.
144 *
145 * This suspending function is cancellable.
146 * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
147 * function immediately disposes of its subscription and resumes with [CancellationException].
148 */
149 @Suppress("UNCHECKED_CAST")
150 public suspend fun <T> ObservableSource<T & Any>.awaitFirstOrDefault(default: T): T =
151 awaitOne(Mode.FIRST_OR_DEFAULT, default) as T
152
153 /**
154 * Awaits the first value from the given [Observable], or returns `null` if none is emitted, without blocking the
155 * thread, and returns the resulting value, or, if this observable has produced an error, throws the corresponding
156 * exception.
157 *
158 * This suspending function is cancellable.
159 * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
160 * function immediately disposes of its subscription and resumes with [CancellationException].
161 */
162 public suspend fun <T> ObservableSource<T & Any>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
163
164 /**
165 * Awaits the first value from the given [Observable], or calls [defaultValue] to get a value if none is emitted,
166 * without blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws
167 * the corresponding exception.
168 *
169 * This suspending function is cancellable.
170 * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
171 * function immediately disposes of its subscription and resumes with [CancellationException].
172 */
173 public suspend fun <T> ObservableSource<T & Any>.awaitFirstOrElse(defaultValue: () -> T): T =
174 awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
175
176 /**
177 * Awaits the last value from the given [Observable] without blocking the thread and
178 * returns the resulting value, or, if this observable has produced an error, throws the corresponding exception.
179 *
180 * This suspending function is cancellable.
181 * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
182 * function immediately disposes of its subscription and resumes with [CancellationException].
183 *
184 * @throws NoSuchElementException if the observable does not emit any value
185 */
186 @Suppress("UNCHECKED_CAST")
187 public suspend fun <T> ObservableSource<T & Any>.awaitLast(): T = awaitOne(Mode.LAST) as T
188
189 /**
190 * Awaits the single value from the given observable without blocking the thread and returns the resulting value, or,
191 * if this observable has produced an error, throws the corresponding exception.
192 *
193 * This suspending function is cancellable.
194 * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
195 * function immediately disposes of its subscription and resumes with [CancellationException].
196 *
197 * @throws NoSuchElementException if the observable does not emit any value
198 * @throws IllegalArgumentException if the observable emits more than one value
199 */
200 @Suppress("UNCHECKED_CAST")
201 public suspend fun <T> ObservableSource<T & Any>.awaitSingle(): T = awaitOne(Mode.SINGLE) as T
202
203 // ------------------------ private ------------------------
204
disposeOnCancellationnull205 internal fun CancellableContinuation<*>.disposeOnCancellation(d: Disposable) =
206 invokeOnCancellation { d.dispose() }
207
208 private enum class Mode(@JvmField val s: String) {
209 FIRST("awaitFirst"),
210 FIRST_OR_DEFAULT("awaitFirstOrDefault"),
211 LAST("awaitLast"),
212 SINGLE("awaitSingle");
toStringnull213 override fun toString(): String = s
214 }
215
216 private suspend fun <T> ObservableSource<T & Any>.awaitOne(
217 mode: Mode,
218 default: T? = null
219 ): T? = suspendCancellableCoroutine { cont ->
220 subscribe(object : Observer<T & Any> {
221 private lateinit var subscription: Disposable
222 private var value: T? = null
223 private var seenValue = false
224
225 override fun onSubscribe(sub: Disposable) {
226 subscription = sub
227 cont.invokeOnCancellation { sub.dispose() }
228 }
229
230 override fun onNext(t: T & Any) {
231 when (mode) {
232 Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
233 if (!seenValue) {
234 seenValue = true
235 cont.resume(t)
236 subscription.dispose()
237 }
238 }
239 Mode.LAST, Mode.SINGLE -> {
240 if (mode == Mode.SINGLE && seenValue) {
241 if (cont.isActive)
242 cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
243 subscription.dispose()
244 } else {
245 value = t
246 seenValue = true
247 }
248 }
249 }
250 }
251
252 @Suppress("UNCHECKED_CAST")
253 override fun onComplete() {
254 if (seenValue) {
255 if (cont.isActive) cont.resume(value as T)
256 return
257 }
258 when {
259 mode == Mode.FIRST_OR_DEFAULT -> {
260 cont.resume(default as T)
261 }
262 cont.isActive -> {
263 cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
264 }
265 }
266 }
267
268 override fun onError(e: Throwable) {
269 cont.resumeWithException(e)
270 }
271 })
272 }
273
274