• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx3
6 
7 import io.reactivex.rxjava3.core.*
8 import io.reactivex.rxjava3.disposables.Disposable
9 import kotlinx.coroutines.CancellableContinuation
10 import kotlinx.coroutines.CancellationException
11 import kotlinx.coroutines.Job
12 import kotlinx.coroutines.suspendCancellableCoroutine
13 import kotlin.coroutines.*
14 
15 // ------------------------ CompletableSource ------------------------
16 
17 /**
18  * Awaits for completion of this completable without blocking the thread.
19  * Returns `Unit`, or throws the corresponding exception if this completable produces an error.
20  *
21  * This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this
22  * suspending function is suspended, this function immediately resumes with [CancellationException] and disposes of its
23  * subscription.
24  */
25 public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont ->
26     subscribe(object : CompletableObserver {
27         override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
28         override fun onComplete() { cont.resume(Unit) }
29         override fun onError(e: Throwable) { cont.resumeWithException(e) }
30     })
31 }
32 
33 // ------------------------ MaybeSource ------------------------
34 
35 /**
36  * Awaits for completion of the [MaybeSource] without blocking the thread.
37  * Returns the resulting value, or `null` if no value is produced, or throws the corresponding exception if this
38  * [MaybeSource] produces an error.
39  *
40  * This suspending function is cancellable.
41  * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this
42  * function immediately resumes with [CancellationException] and disposes of its subscription.
43  */
44 @Suppress("UNCHECKED_CAST")
awaitSingleOrNullnull45 public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
46     subscribe(object : MaybeObserver<T> {
47         override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
48         override fun onComplete() { cont.resume(null) }
49         override fun onSuccess(t: T) { cont.resume(t) }
50         override fun onError(error: Throwable) { cont.resumeWithException(error) }
51     })
52 }
53 
54 /**
55  * Awaits for completion of the [MaybeSource] without blocking the thread.
56  * Returns the resulting value, or throws if either no value is produced or this [MaybeSource] produces an error.
57  *
58  * This suspending function is cancellable.
59  * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this
60  * function immediately resumes with [CancellationException] and disposes of its subscription.
61  *
62  * @throws NoSuchElementException if no elements were produced by this [MaybeSource].
63  */
awaitSinglenull64 public suspend fun <T> MaybeSource<T>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException()
65 
66 /**
67  * Awaits for completion of the maybe without blocking a thread.
68  * Returns the resulting value, null if no value was produced or throws the corresponding exception if this
69  * maybe had produced error.
70  *
71  * This suspending function is cancellable.
72  * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
73  * immediately resumes with [CancellationException].
74  *
75  * ### Deprecation
76  *
77  * Deprecated in favor of [awaitSingleOrNull] in order to reflect that `null` can be returned to denote the absence of
78  * a value, as opposed to throwing in such case.
79  *
80  * @suppress
81  */
82 @Deprecated(
83     message = "Deprecated in favor of awaitSingleOrNull()",
84     level = DeprecationLevel.ERROR,
85     replaceWith = ReplaceWith("this.awaitSingleOrNull()")
86 ) // Warning since 1.5, error in 1.6, hidden in 1.7
87 public suspend fun <T> MaybeSource<T>.await(): T? = awaitSingleOrNull()
88 
89 /**
90  * Awaits for completion of the maybe without blocking a thread.
91  * Returns the resulting value, [default] if no value was produced or throws the corresponding exception if this
92  * maybe had produced error.
93  *
94  * This suspending function is cancellable.
95  * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
96  * immediately resumes with [CancellationException].
97  *
98  * ### Deprecation
99  *
100  * Deprecated in favor of [awaitSingleOrNull] for naming consistency (see the deprecation of [MaybeSource.await] for
101  * details).
102  *
103  * @suppress
104  */
105 @Deprecated(
106     message = "Deprecated in favor of awaitSingleOrNull()",
107     level = DeprecationLevel.ERROR,
108     replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default")
109 ) // Warning since 1.5, error in 1.6, hidden in 1.7
110 public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default
111 
112 // ------------------------ SingleSource ------------------------
113 
114 /**
115  * Awaits for completion of the single value response without blocking the thread.
116  * Returns the resulting value, or throws the corresponding exception if this response produces an error.
117  *
118  * This suspending function is cancellable.
119  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
120  * function immediately disposes of its subscription and resumes with [CancellationException].
121  */
122 public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
123     subscribe(object : SingleObserver<T> {
124         override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
125         override fun onSuccess(t: T) { cont.resume(t) }
126         override fun onError(error: Throwable) { cont.resumeWithException(error) }
127     })
128 }
129 
130 // ------------------------ ObservableSource ------------------------
131 
132 /**
133  * Awaits the first value from the given [Observable] without blocking the thread and returns the resulting value, or,
134  * if the observable has produced an error, throws the corresponding exception.
135  *
136  * This suspending function is cancellable.
137  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
138  * function immediately disposes of its subscription and resumes with [CancellationException].
139  *
140  * @throws NoSuchElementException if the observable does not emit any value
141  */
awaitFirstnull142 public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
143 
144 /**
145  * Awaits the first value from the given [Observable], or returns the [default] value if none is emitted, without
146  * blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the
147  * corresponding exception.
148  *
149  * This suspending function is cancellable.
150  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
151  * function immediately disposes of its subscription and resumes with [CancellationException].
152  */
153 public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
154 
155 /**
156  * Awaits the first value from the given [Observable], or returns `null` if none is emitted, without blocking the
157  * thread, and returns the resulting value, or, if this observable has produced an error, throws the corresponding
158  * exception.
159  *
160  * This suspending function is cancellable.
161  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
162  * function immediately disposes of its subscription and resumes with [CancellationException].
163  */
164 public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
165 
166 /**
167  * Awaits the first value from the given [Observable], or calls [defaultValue] to get a value if none is emitted,
168  * without blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws
169  * the corresponding exception.
170  *
171  * This suspending function is cancellable.
172  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
173  * function immediately disposes of its subscription and resumes with [CancellationException].
174  */
175 public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T =
176     awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
177 
178 /**
179  * Awaits the last value from the given [Observable] without blocking the thread and
180  * returns the resulting value, or, if this observable has produced an error, throws the corresponding exception.
181  *
182  * This suspending function is cancellable.
183  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
184  * function immediately disposes of its subscription and resumes with [CancellationException].
185  *
186  * @throws NoSuchElementException if the observable does not emit any value
187  */
188 public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST)
189 
190 /**
191  * Awaits the single value from the given observable without blocking the thread and returns the resulting value, or,
192  * if this observable has produced an error, throws the corresponding exception.
193  *
194  * This suspending function is cancellable.
195  * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
196  * function immediately disposes of its subscription and resumes with [CancellationException].
197  *
198  * @throws NoSuchElementException if the observable does not emit any value
199  * @throws IllegalArgumentException if the observable emits more than one value
200  */
201 public suspend fun <T> ObservableSource<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
202 
203 // ------------------------ private ------------------------
204 
205 internal fun CancellableContinuation<*>.disposeOnCancellation(d: Disposable) =
206     invokeOnCancellation { d.dispose() }
207 
208 private enum class Mode(val s: String) {
209     FIRST("awaitFirst"),
210     FIRST_OR_DEFAULT("awaitFirstOrDefault"),
211     LAST("awaitLast"),
212     SINGLE("awaitSingle");
toStringnull213     override fun toString(): String = s
214 }
215 
216 private suspend fun <T> ObservableSource<T>.awaitOne(
217     mode: Mode,
218     default: T? = null
219 ): T = suspendCancellableCoroutine { cont ->
220     subscribe(object : Observer<T> {
221         private lateinit var subscription: Disposable
222         private var value: T? = null
223         private var seenValue = false
224 
225         override fun onSubscribe(sub: Disposable) {
226             subscription = sub
227             cont.invokeOnCancellation { sub.dispose() }
228         }
229 
230         override fun onNext(t: T) {
231             when (mode) {
232                 Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
233                     if (!seenValue) {
234                         seenValue = true
235                         cont.resume(t)
236                         subscription.dispose()
237                     }
238                 }
239                 Mode.LAST, Mode.SINGLE -> {
240                     if (mode == Mode.SINGLE && seenValue) {
241                         if (cont.isActive)
242                             cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
243                         subscription.dispose()
244                     } else {
245                         value = t
246                         seenValue = true
247                     }
248                 }
249             }
250         }
251 
252         @Suppress("UNCHECKED_CAST")
253         override fun onComplete() {
254             if (seenValue) {
255                 if (cont.isActive) cont.resume(value as T)
256                 return
257             }
258             when {
259                 mode == Mode.FIRST_OR_DEFAULT -> {
260                     cont.resume(default as T)
261                 }
262                 cont.isActive -> {
263                     cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
264                 }
265             }
266         }
267 
268         override fun onError(e: Throwable) {
269             cont.resumeWithException(e)
270         }
271     })
272 }
273 
274