<lambda>null1 package kotlinx.coroutines.rx2
2
3 import io.reactivex.*
4 import io.reactivex.disposables.Disposable
5 import kotlinx.coroutines.CancellableContinuation
6 import kotlinx.coroutines.CancellationException
7 import kotlinx.coroutines.Job
8 import kotlinx.coroutines.suspendCancellableCoroutine
9 import kotlin.coroutines.*
10
11 // ------------------------ CompletableSource ------------------------
12
13 /**
14 * Awaits for completion of this completable without blocking the thread.
15 * Returns `Unit`, or throws the corresponding exception if this completable produces an error.
16 *
17 * This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled while this
18 * suspending function is suspended, this function immediately resumes with [CancellationException] and disposes of its
19 * subscription.
20 */
21 public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont ->
22 subscribe(object : CompletableObserver {
23 override fun onSubscribe(d: Disposable) {
24 cont.disposeOnCancellation(d)
25 }
26
27 override fun onComplete() {
28 cont.resume(Unit)
29 }
30
31 override fun onError(e: Throwable) {
32 cont.resumeWithException(e)
33 }
34 })
35 }
36
37 // ------------------------ MaybeSource ------------------------
38
39 /**
40 * Awaits for completion of the [MaybeSource] without blocking the thread.
41 * Returns the resulting value, or `null` if no value is produced, or throws the corresponding exception if this
42 * [MaybeSource] produces an error.
43 *
44 * This suspending function is cancellable.
45 * If the [Job] of the current coroutine is cancelled while this suspending function is waiting, this
46 * function immediately resumes with [CancellationException] and disposes of its subscription.
47 */
awaitSingleOrNullnull48 public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
49 subscribe(object : MaybeObserver<T> {
50 override fun onSubscribe(d: Disposable) {
51 cont.disposeOnCancellation(d)
52 }
53
54 override fun onComplete() {
55 cont.resume(null)
56 }
57
58 override fun onSuccess(t: T & Any) {
59 cont.resume(t)
60 }
61
62 override fun onError(error: Throwable) {
63 cont.resumeWithException(error)
64 }
65 })
66 }
67
68 /**
69 * Awaits for completion of the [MaybeSource] without blocking the thread.
70 * Returns the resulting value, or throws if either no value is produced or this [MaybeSource] produces an error.
71 *
72 * This suspending function is cancellable.
73 * If the [Job] of the current coroutine is cancelled while this suspending function is waiting, this
74 * function immediately resumes with [CancellationException] and disposes of its subscription.
75 *
76 * @throws NoSuchElementException if no elements were produced by this [MaybeSource].
77 */
awaitSinglenull78 public suspend fun <T> MaybeSource<T>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException()
79
80 /**
81 * Awaits for completion of the maybe without blocking a thread.
82 * Returns the resulting value, null if no value was produced or throws the corresponding exception if this
83 * maybe had produced error.
84 *
85 * This suspending function is cancellable.
86 * If the [Job] of the current coroutine is cancelled while this suspending function is waiting, this function
87 * immediately resumes with [CancellationException].
88 *
89 * ### Deprecation
90 *
91 * Deprecated in favor of [awaitSingleOrNull] in order to reflect that `null` can be returned to denote the absence of
92 * a value, as opposed to throwing in such case.
93 * @suppress
94 */
95 @Deprecated(
96 message = "Deprecated in favor of awaitSingleOrNull()",
97 level = DeprecationLevel.HIDDEN,
98 replaceWith = ReplaceWith("this.awaitSingleOrNull()")
99 ) // Warning since 1.5, error in 1.6, hidden in 1.7
100 public suspend fun <T> MaybeSource<T>.await(): T? = awaitSingleOrNull()
101
102 /**
103 * Awaits for completion of the maybe without blocking a thread.
104 * Returns the resulting value, [default] if no value was produced or throws the corresponding exception if this
105 * maybe had produced error.
106 *
107 * This suspending function is cancellable.
108 * If the [Job] of the current coroutine is cancelled while this suspending function is waiting, this function
109 * immediately resumes with [CancellationException].
110 *
111 * ### Deprecation
112 *
113 * Deprecated in favor of [awaitSingleOrNull] for naming consistency (see the deprecation of [MaybeSource.await] for
114 * details).
115 * @suppress
116 */
117 @Deprecated(
118 message = "Deprecated in favor of awaitSingleOrNull()",
119 level = DeprecationLevel.HIDDEN,
120 replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default")
121 ) // Warning since 1.5, error in 1.6, hidden in 1.7
122 public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default
123
124 // ------------------------ SingleSource ------------------------
125
126 /**
127 * Awaits for completion of the single value response without blocking the thread.
128 * Returns the resulting value, or throws the corresponding exception if this response produces an error.
129 *
130 * This suspending function is cancellable.
131 * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
132 * function immediately disposes of its subscription and resumes with [CancellationException].
133 */
134 public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
135 subscribe(object : SingleObserver<T> {
136 override fun onSubscribe(d: Disposable) {
137 cont.disposeOnCancellation(d)
138 }
139
140 override fun onSuccess(t: T & Any) {
141 cont.resume(t)
142 }
143
144 override fun onError(error: Throwable) {
145 cont.resumeWithException(error)
146 }
147 })
148 }
149
150 // ------------------------ ObservableSource ------------------------
151
152 /**
153 * Awaits the first value from the given [Observable] without blocking the thread and returns the resulting value, or,
154 * if the observable has produced an error, throws the corresponding exception.
155 *
156 * This suspending function is cancellable.
157 * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
158 * function immediately disposes of its subscription and resumes with [CancellationException].
159 *
160 * @throws NoSuchElementException if the observable does not emit any value
161 */
awaitFirstnull162 public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
163
164 /**
165 * Awaits the first value from the given [Observable], or returns the [default] value if none is emitted, without
166 * blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the
167 * corresponding exception.
168 *
169 * This suspending function is cancellable.
170 * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
171 * function immediately disposes of its subscription and resumes with [CancellationException].
172 */
173 public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
174
175 /**
176 * Awaits the first value from the given [Observable], or returns `null` if none is emitted, without blocking the
177 * thread, and returns the resulting value, or, if this observable has produced an error, throws the corresponding
178 * exception.
179 *
180 * This suspending function is cancellable.
181 * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
182 * function immediately disposes of its subscription and resumes with [CancellationException].
183 */
184 public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
185
186 /**
187 * Awaits the first value from the given [Observable], or calls [defaultValue] to get a value if none is emitted,
188 * without blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws
189 * the corresponding exception.
190 *
191 * This suspending function is cancellable.
192 * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
193 * function immediately disposes of its subscription and resumes with [CancellationException].
194 */
195 public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T =
196 awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
197
198 /**
199 * Awaits the last value from the given [Observable] without blocking the thread and
200 * returns the resulting value, or, if this observable has produced an error, throws the corresponding exception.
201 *
202 * This suspending function is cancellable.
203 * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
204 * function immediately disposes of its subscription and resumes with [CancellationException].
205 *
206 * @throws NoSuchElementException if the observable does not emit any value
207 */
208 public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST)
209
210 /**
211 * Awaits the single value from the given observable without blocking the thread and returns the resulting value, or,
212 * if this observable has produced an error, throws the corresponding exception.
213 *
214 * This suspending function is cancellable.
215 * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
216 * function immediately disposes of its subscription and resumes with [CancellationException].
217 *
218 * @throws NoSuchElementException if the observable does not emit any value
219 * @throws IllegalArgumentException if the observable emits more than one value
220 */
221 public suspend fun <T> ObservableSource<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
222
223 // ------------------------ private ------------------------
224
225 internal fun CancellableContinuation<*>.disposeOnCancellation(d: Disposable) =
226 invokeOnCancellation { d.dispose() }
227
228 private enum class Mode(val s: String) {
229 FIRST("awaitFirst"),
230 FIRST_OR_DEFAULT("awaitFirstOrDefault"),
231 LAST("awaitLast"),
232 SINGLE("awaitSingle");
toStringnull233 override fun toString(): String = s
234 }
235
236 private suspend fun <T> ObservableSource<T>.awaitOne(
237 mode: Mode,
238 default: T? = null
239 ): T = suspendCancellableCoroutine { cont ->
240 subscribe(object : Observer<T> {
241 private lateinit var subscription: Disposable
242 private var value: T? = null
243 private var seenValue = false
244
245 override fun onSubscribe(sub: Disposable) {
246 subscription = sub
247 cont.invokeOnCancellation { sub.dispose() }
248 }
249
250 override fun onNext(t: T & Any) {
251 when (mode) {
252 Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
253 if (!seenValue) {
254 seenValue = true
255 cont.resume(t)
256 subscription.dispose()
257 }
258 }
259 Mode.LAST, Mode.SINGLE -> {
260 if (mode == Mode.SINGLE && seenValue) {
261 if (cont.isActive)
262 cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
263 subscription.dispose()
264 } else {
265 value = t
266 seenValue = true
267 }
268 }
269 }
270 }
271
272 @Suppress("UNCHECKED_CAST")
273 override fun onComplete() {
274 if (seenValue) {
275 if (cont.isActive) cont.resume(value as T)
276 return
277 }
278 when {
279 mode == Mode.FIRST_OR_DEFAULT -> {
280 cont.resume(default as T)
281 }
282 cont.isActive -> {
283 cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
284 }
285 }
286 }
287
288 override fun onError(e: Throwable) {
289 cont.resumeWithException(e)
290 }
291 })
292 }
293
294