• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx2
6 
7 import io.reactivex.*
8 import io.reactivex.disposables.Disposable
9 import kotlinx.coroutines.CancellableContinuation
10 import kotlinx.coroutines.CancellationException
11 import kotlinx.coroutines.Job
12 import kotlinx.coroutines.suspendCancellableCoroutine
13 import kotlin.coroutines.*
14 
15 // ------------------------ CompletableSource ------------------------
16 
17 /**
18  * Awaits for completion of this completable without blocking the thread.
19  * Returns `Unit`, or throws the corresponding exception if this completable produces an error.
20  *
21  * This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this
22  * suspending function is suspended, this function immediately resumes with [CancellationException] and disposes of its
23  * subscription.
24  */
25 public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont ->
26     subscribe(object : CompletableObserver {
27         override fun onSubscribe(d: Disposable) {
28             cont.disposeOnCancellation(d)
29         }
30 
31         override fun onComplete() {
32             cont.resume(Unit)
33         }
34 
35         override fun onError(e: Throwable) {
36             cont.resumeWithException(e)
37         }
38     })
39 }
40 
41 // ------------------------ MaybeSource ------------------------
42 
43 /**
44  * Awaits for completion of the [MaybeSource] without blocking the thread.
45  * Returns the resulting value, or `null` if no value is produced, or throws the corresponding exception if this
46  * [MaybeSource] produces an error.
47  *
48  * This suspending function is cancellable.
49  * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this
50  * function immediately resumes with [CancellationException] and disposes of its subscription.
51  */
awaitSingleOrNullnull52 public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
53     subscribe(object : MaybeObserver<T> {
54         override fun onSubscribe(d: Disposable) {
55             cont.disposeOnCancellation(d)
56         }
57 
58         override fun onComplete() {
59             cont.resume(null)
60         }
61 
62         override fun onSuccess(t: T & Any) {
63             cont.resume(t)
64         }
65 
66         override fun onError(error: Throwable) {
67             cont.resumeWithException(error)
68         }
69     })
70 }
71 
72 /**
73  * Awaits for completion of the [MaybeSource] without blocking the thread.
74  * Returns the resulting value, or throws if either no value is produced or this [MaybeSource] produces an error.
75  *
76  * This suspending function is cancellable.
77  * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this
78  * function immediately resumes with [CancellationException] and disposes of its subscription.
79  *
80  * @throws NoSuchElementException if no elements were produced by this [MaybeSource].
81  */
awaitSinglenull82 public suspend fun <T> MaybeSource<T>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException()
83 
84 /**
85  * Awaits for completion of the maybe without blocking a thread.
86  * Returns the resulting value, null if no value was produced or throws the corresponding exception if this
87  * maybe had produced error.
88  *
89  * This suspending function is cancellable.
90  * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
91  * immediately resumes with [CancellationException].
92  *
93  * ### Deprecation
94  *
95  * Deprecated in favor of [awaitSingleOrNull] in order to reflect that `null` can be returned to denote the absence of
96  * a value, as opposed to throwing in such case.
97  * @suppress
98  */
99 @Deprecated(
100     message = "Deprecated in favor of awaitSingleOrNull()",
101     level = DeprecationLevel.HIDDEN,
102     replaceWith = ReplaceWith("this.awaitSingleOrNull()")
103 ) // Warning since 1.5, error in 1.6, hidden in 1.7
104 public suspend fun <T> MaybeSource<T>.await(): T? = awaitSingleOrNull()
105 
106 /**
107  * Awaits for completion of the maybe without blocking a thread.
108  * Returns the resulting value, [default] if no value was produced or throws the corresponding exception if this
109  * maybe had produced error.
110  *
111  * This suspending function is cancellable.
112  * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
113  * immediately resumes with [CancellationException].
114  *
115  * ### Deprecation
116  *
117  * Deprecated in favor of [awaitSingleOrNull] for naming consistency (see the deprecation of [MaybeSource.await] for
118  * details).
119  * @suppress
120  */
121 @Deprecated(
122     message = "Deprecated in favor of awaitSingleOrNull()",
123     level = DeprecationLevel.HIDDEN,
124     replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default")
125 ) // Warning since 1.5, error in 1.6, hidden in 1.7
126 public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default
127 
128 // ------------------------ SingleSource ------------------------
129 
130 /**
131  * Awaits for completion of the single value response without blocking the thread.
132  * Returns the resulting value, or throws the corresponding exception if this response produces an error.
133  *
134  * This suspending function is cancellable.
135  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
136  * function immediately disposes of its subscription and resumes with [CancellationException].
137  */
138 public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
139     subscribe(object : SingleObserver<T> {
140         override fun onSubscribe(d: Disposable) {
141             cont.disposeOnCancellation(d)
142         }
143 
144         override fun onSuccess(t: T & Any) {
145             cont.resume(t)
146         }
147 
148         override fun onError(error: Throwable) {
149             cont.resumeWithException(error)
150         }
151     })
152 }
153 
154 // ------------------------ ObservableSource ------------------------
155 
156 /**
157  * Awaits the first value from the given [Observable] without blocking the thread and returns the resulting value, or,
158  * if the observable has produced an error, throws the corresponding exception.
159  *
160  * This suspending function is cancellable.
161  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
162  * function immediately disposes of its subscription and resumes with [CancellationException].
163  *
164  * @throws NoSuchElementException if the observable does not emit any value
165  */
awaitFirstnull166 public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
167 
168 /**
169  * Awaits the first value from the given [Observable], or returns the [default] value if none is emitted, without
170  * blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the
171  * corresponding exception.
172  *
173  * This suspending function is cancellable.
174  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
175  * function immediately disposes of its subscription and resumes with [CancellationException].
176  */
177 public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
178 
179 /**
180  * Awaits the first value from the given [Observable], or returns `null` if none is emitted, without blocking the
181  * thread, and returns the resulting value, or, if this observable has produced an error, throws the corresponding
182  * exception.
183  *
184  * This suspending function is cancellable.
185  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
186  * function immediately disposes of its subscription and resumes with [CancellationException].
187  */
188 public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
189 
190 /**
191  * Awaits the first value from the given [Observable], or calls [defaultValue] to get a value if none is emitted,
192  * without blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws
193  * the corresponding exception.
194  *
195  * This suspending function is cancellable.
196  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
197  * function immediately disposes of its subscription and resumes with [CancellationException].
198  */
199 public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T =
200     awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
201 
202 /**
203  * Awaits the last value from the given [Observable] without blocking the thread and
204  * returns the resulting value, or, if this observable has produced an error, throws the corresponding exception.
205  *
206  * This suspending function is cancellable.
207  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
208  * function immediately disposes of its subscription and resumes with [CancellationException].
209  *
210  * @throws NoSuchElementException if the observable does not emit any value
211  */
212 public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST)
213 
214 /**
215  * Awaits the single value from the given observable without blocking the thread and returns the resulting value, or,
216  * if this observable has produced an error, throws the corresponding exception.
217  *
218  * This suspending function is cancellable.
219  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
220  * function immediately disposes of its subscription and resumes with [CancellationException].
221  *
222  * @throws NoSuchElementException if the observable does not emit any value
223  * @throws IllegalArgumentException if the observable emits more than one value
224  */
225 public suspend fun <T> ObservableSource<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
226 
227 // ------------------------ private ------------------------
228 
229 internal fun CancellableContinuation<*>.disposeOnCancellation(d: Disposable) =
230     invokeOnCancellation { d.dispose() }
231 
232 private enum class Mode(val s: String) {
233     FIRST("awaitFirst"),
234     FIRST_OR_DEFAULT("awaitFirstOrDefault"),
235     LAST("awaitLast"),
236     SINGLE("awaitSingle");
toStringnull237     override fun toString(): String = s
238 }
239 
240 private suspend fun <T> ObservableSource<T>.awaitOne(
241     mode: Mode,
242     default: T? = null
243 ): T = suspendCancellableCoroutine { cont ->
244     subscribe(object : Observer<T> {
245         private lateinit var subscription: Disposable
246         private var value: T? = null
247         private var seenValue = false
248 
249         override fun onSubscribe(sub: Disposable) {
250             subscription = sub
251             cont.invokeOnCancellation { sub.dispose() }
252         }
253 
254         override fun onNext(t: T & Any) {
255             when (mode) {
256                 Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
257                     if (!seenValue) {
258                         seenValue = true
259                         cont.resume(t)
260                         subscription.dispose()
261                     }
262                 }
263                 Mode.LAST, Mode.SINGLE -> {
264                     if (mode == Mode.SINGLE && seenValue) {
265                         if (cont.isActive)
266                             cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
267                         subscription.dispose()
268                     } else {
269                         value = t
270                         seenValue = true
271                     }
272                 }
273             }
274         }
275 
276         @Suppress("UNCHECKED_CAST")
277         override fun onComplete() {
278             if (seenValue) {
279                 if (cont.isActive) cont.resume(value as T)
280                 return
281             }
282             when {
283                 mode == Mode.FIRST_OR_DEFAULT -> {
284                     cont.resume(default as T)
285                 }
286                 cont.isActive -> {
287                     cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
288                 }
289             }
290         }
291 
292         override fun onError(e: Throwable) {
293             cont.resumeWithException(e)
294         }
295     })
296 }
297 
298