• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx3
6 
7 import io.reactivex.rxjava3.core.*
8 import io.reactivex.rxjava3.disposables.Disposable
9 import kotlinx.coroutines.CancellableContinuation
10 import kotlinx.coroutines.CancellationException
11 import kotlinx.coroutines.Job
12 import kotlinx.coroutines.suspendCancellableCoroutine
13 import kotlin.coroutines.*
14 
15 // ------------------------ CompletableSource ------------------------
16 
17 /**
18  * Awaits for completion of this completable without blocking the thread.
19  * Returns `Unit`, or throws the corresponding exception if this completable produces an error.
20  *
21  * This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this
22  * suspending function is suspended, this function immediately resumes with [CancellationException] and disposes of its
23  * subscription.
24  */
25 public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont ->
26     subscribe(object : CompletableObserver {
27         override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
28         override fun onComplete() { cont.resume(Unit) }
29         override fun onError(e: Throwable) { cont.resumeWithException(e) }
30     })
31 }
32 
33 // ------------------------ MaybeSource ------------------------
34 
35 /**
36  * Awaits for completion of the [MaybeSource] without blocking the thread.
37  * Returns the resulting value, or `null` if no value is produced, or throws the corresponding exception if this
38  * [MaybeSource] produces an error.
39  *
40  * This suspending function is cancellable.
41  * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this
42  * function immediately resumes with [CancellationException] and disposes of its subscription.
43  */
44 public suspend fun <T> MaybeSource<T & Any>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
45     subscribe(object : MaybeObserver<T & Any> {
onSubscribenull46         override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
onCompletenull47         override fun onComplete() { cont.resume(null) }
48         override fun onSuccess(t: T & Any) { cont.resume(t) }
onErrornull49         override fun onError(error: Throwable) { cont.resumeWithException(error) }
50     })
51 }
52 
53 /**
54  * Awaits for completion of the [MaybeSource] without blocking the thread.
55  * Returns the resulting value, or throws if either no value is produced or this [MaybeSource] produces an error.
56  *
57  * This suspending function is cancellable.
58  * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this
59  * function immediately resumes with [CancellationException] and disposes of its subscription.
60  *
61  * @throws NoSuchElementException if no elements were produced by this [MaybeSource].
62  */
63 public suspend fun <T> MaybeSource<T & Any>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException()
64 
65 /**
66  * Awaits for completion of the maybe without blocking a thread.
67  * Returns the resulting value, null if no value was produced or throws the corresponding exception if this
68  * maybe had produced error.
69  *
70  * This suspending function is cancellable.
71  * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
72  * immediately resumes with [CancellationException].
73  *
74  * ### Deprecation
75  *
76  * Deprecated in favor of [awaitSingleOrNull] in order to reflect that `null` can be returned to denote the absence of
77  * a value, as opposed to throwing in such case.
78  *
79  * @suppress
80  */
81 @Deprecated(
82     message = "Deprecated in favor of awaitSingleOrNull()",
83     level = DeprecationLevel.HIDDEN,
84     replaceWith = ReplaceWith("this.awaitSingleOrNull()")
85 ) // Warning since 1.5, error in 1.6, hidden in 1.7
86 public suspend fun <T> MaybeSource<T & Any>.await(): T? = awaitSingleOrNull()
87 
88 /**
89  * Awaits for completion of the maybe without blocking a thread.
90  * Returns the resulting value, [default] if no value was produced or throws the corresponding exception if this
91  * maybe had produced error.
92  *
93  * This suspending function is cancellable.
94  * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
95  * immediately resumes with [CancellationException].
96  *
97  * ### Deprecation
98  *
99  * Deprecated in favor of [awaitSingleOrNull] for naming consistency (see the deprecation of [MaybeSource.await] for
100  * details).
101  *
102  * @suppress
103  */
104 @Deprecated(
105     message = "Deprecated in favor of awaitSingleOrNull()",
106     level = DeprecationLevel.HIDDEN,
107     replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default")
108 ) // Warning since 1.5, error in 1.6, hidden in 1.7
109 public suspend fun <T> MaybeSource<T & Any>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default
110 
111 // ------------------------ SingleSource ------------------------
112 
113 /**
114  * Awaits for completion of the single value response without blocking the thread.
115  * Returns the resulting value, or throws the corresponding exception if this response produces an error.
116  *
117  * This suspending function is cancellable.
118  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
119  * function immediately disposes of its subscription and resumes with [CancellationException].
120  */
121 public suspend fun <T> SingleSource<T & Any>.await(): T = suspendCancellableCoroutine { cont ->
122     subscribe(object : SingleObserver<T & Any> {
onSubscribenull123         override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
124         override fun onSuccess(t: T & Any) { cont.resume(t) }
onErrornull125         override fun onError(error: Throwable) { cont.resumeWithException(error) }
126     })
127 }
128 
129 // ------------------------ ObservableSource ------------------------
130 
131 /**
132  * Awaits the first value from the given [Observable] without blocking the thread and returns the resulting value, or,
133  * if the observable has produced an error, throws the corresponding exception.
134  *
135  * This suspending function is cancellable.
136  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
137  * function immediately disposes of its subscription and resumes with [CancellationException].
138  *
139  * @throws NoSuchElementException if the observable does not emit any value
140  */
141 @Suppress("UNCHECKED_CAST")
142 public suspend fun <T> ObservableSource<T & Any>.awaitFirst(): T = awaitOne(Mode.FIRST) as T
143 
144 /**
145  * Awaits the first value from the given [Observable], or returns the [default] value if none is emitted, without
146  * blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the
147  * corresponding exception.
148  *
149  * This suspending function is cancellable.
150  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
151  * function immediately disposes of its subscription and resumes with [CancellationException].
152  */
153 @Suppress("UNCHECKED_CAST")
154 public suspend fun <T> ObservableSource<T & Any>.awaitFirstOrDefault(default: T): T =
155     awaitOne(Mode.FIRST_OR_DEFAULT, default) as T
156 
157 /**
158  * Awaits the first value from the given [Observable], or returns `null` if none is emitted, without blocking the
159  * thread, and returns the resulting value, or, if this observable has produced an error, throws the corresponding
160  * exception.
161  *
162  * This suspending function is cancellable.
163  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
164  * function immediately disposes of its subscription and resumes with [CancellationException].
165  */
166 public suspend fun <T> ObservableSource<T & Any>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
167 
168 /**
169  * Awaits the first value from the given [Observable], or calls [defaultValue] to get a value if none is emitted,
170  * without blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws
171  * the corresponding exception.
172  *
173  * This suspending function is cancellable.
174  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
175  * function immediately disposes of its subscription and resumes with [CancellationException].
176  */
177 public suspend fun <T> ObservableSource<T & Any>.awaitFirstOrElse(defaultValue: () -> T): T =
178     awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
179 
180 /**
181  * Awaits the last value from the given [Observable] without blocking the thread and
182  * returns the resulting value, or, if this observable has produced an error, throws the corresponding exception.
183  *
184  * This suspending function is cancellable.
185  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
186  * function immediately disposes of its subscription and resumes with [CancellationException].
187  *
188  * @throws NoSuchElementException if the observable does not emit any value
189  */
190 @Suppress("UNCHECKED_CAST")
191 public suspend fun <T> ObservableSource<T & Any>.awaitLast(): T = awaitOne(Mode.LAST) as T
192 
193 /**
194  * Awaits the single value from the given observable without blocking the thread and returns the resulting value, or,
195  * if this observable has produced an error, throws the corresponding exception.
196  *
197  * This suspending function is cancellable.
198  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
199  * function immediately disposes of its subscription and resumes with [CancellationException].
200  *
201  * @throws NoSuchElementException if the observable does not emit any value
202  * @throws IllegalArgumentException if the observable emits more than one value
203  */
204 @Suppress("UNCHECKED_CAST")
205 public suspend fun <T> ObservableSource<T & Any>.awaitSingle(): T = awaitOne(Mode.SINGLE) as T
206 
207 // ------------------------ private ------------------------
208 
disposeOnCancellationnull209 internal fun CancellableContinuation<*>.disposeOnCancellation(d: Disposable) =
210     invokeOnCancellation { d.dispose() }
211 
212 private enum class Mode(@JvmField val s: String) {
213     FIRST("awaitFirst"),
214     FIRST_OR_DEFAULT("awaitFirstOrDefault"),
215     LAST("awaitLast"),
216     SINGLE("awaitSingle");
toStringnull217     override fun toString(): String = s
218 }
219 
220 private suspend fun <T> ObservableSource<T & Any>.awaitOne(
221     mode: Mode,
222     default: T? = null
223 ): T? = suspendCancellableCoroutine { cont ->
224     subscribe(object : Observer<T & Any> {
225         private lateinit var subscription: Disposable
226         private var value: T? = null
227         private var seenValue = false
228 
229         override fun onSubscribe(sub: Disposable) {
230             subscription = sub
231             cont.invokeOnCancellation { sub.dispose() }
232         }
233 
234         override fun onNext(t: T & Any) {
235             when (mode) {
236                 Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
237                     if (!seenValue) {
238                         seenValue = true
239                         cont.resume(t)
240                         subscription.dispose()
241                     }
242                 }
243                 Mode.LAST, Mode.SINGLE -> {
244                     if (mode == Mode.SINGLE && seenValue) {
245                         if (cont.isActive)
246                             cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
247                         subscription.dispose()
248                     } else {
249                         value = t
250                         seenValue = true
251                     }
252                 }
253             }
254         }
255 
256         @Suppress("UNCHECKED_CAST")
257         override fun onComplete() {
258             if (seenValue) {
259                 if (cont.isActive) cont.resume(value as T)
260                 return
261             }
262             when {
263                 mode == Mode.FIRST_OR_DEFAULT -> {
264                     cont.resume(default as T)
265                 }
266                 cont.isActive -> {
267                     cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
268                 }
269             }
270         }
271 
272         override fun onError(e: Throwable) {
273             cont.resumeWithException(e)
274         }
275     })
276 }
277 
278