• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.rx3
2 
3 import io.reactivex.rxjava3.core.*
4 import io.reactivex.rxjava3.disposables.Disposable
5 import kotlinx.coroutines.CancellableContinuation
6 import kotlinx.coroutines.CancellationException
7 import kotlinx.coroutines.Job
8 import kotlinx.coroutines.suspendCancellableCoroutine
9 import kotlin.coroutines.*
10 
11 // ------------------------ CompletableSource ------------------------
12 
13 /**
14  * Awaits for completion of this completable without blocking the thread.
15  * Returns `Unit`, or throws the corresponding exception if this completable produces an error.
16  *
17  * This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled while this
18  * suspending function is suspended, this function immediately resumes with [CancellationException] and disposes of its
19  * subscription.
20  */
21 public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont ->
22     subscribe(object : CompletableObserver {
23         override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
24         override fun onComplete() { cont.resume(Unit) }
25         override fun onError(e: Throwable) { cont.resumeWithException(e) }
26     })
27 }
28 
29 // ------------------------ MaybeSource ------------------------
30 
31 /**
32  * Awaits for completion of the [MaybeSource] without blocking the thread.
33  * Returns the resulting value, or `null` if no value is produced, or throws the corresponding exception if this
34  * [MaybeSource] produces an error.
35  *
36  * This suspending function is cancellable.
37  * If the [Job] of the current coroutine is cancelled while this suspending function is waiting, this
38  * function immediately resumes with [CancellationException] and disposes of its subscription.
39  */
40 public suspend fun <T> MaybeSource<T & Any>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
41     subscribe(object : MaybeObserver<T & Any> {
onSubscribenull42         override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
onCompletenull43         override fun onComplete() { cont.resume(null) }
44         override fun onSuccess(t: T & Any) { cont.resume(t) }
onErrornull45         override fun onError(error: Throwable) { cont.resumeWithException(error) }
46     })
47 }
48 
49 /**
50  * Awaits for completion of the [MaybeSource] without blocking the thread.
51  * Returns the resulting value, or throws if either no value is produced or this [MaybeSource] produces an error.
52  *
53  * This suspending function is cancellable.
54  * If the [Job] of the current coroutine is cancelled while this suspending function is waiting, this
55  * function immediately resumes with [CancellationException] and disposes of its subscription.
56  *
57  * @throws NoSuchElementException if no elements were produced by this [MaybeSource].
58  */
59 public suspend fun <T> MaybeSource<T & Any>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException()
60 
61 /**
62  * Awaits for completion of the maybe without blocking a thread.
63  * Returns the resulting value, null if no value was produced or throws the corresponding exception if this
64  * maybe had produced error.
65  *
66  * This suspending function is cancellable.
67  * If the [Job] of the current coroutine is cancelled while this suspending function is waiting, this function
68  * immediately resumes with [CancellationException].
69  *
70  * ### Deprecation
71  *
72  * Deprecated in favor of [awaitSingleOrNull] in order to reflect that `null` can be returned to denote the absence of
73  * a value, as opposed to throwing in such case.
74  *
75  * @suppress
76  */
77 @Deprecated(
78     message = "Deprecated in favor of awaitSingleOrNull()",
79     level = DeprecationLevel.HIDDEN,
80     replaceWith = ReplaceWith("this.awaitSingleOrNull()")
81 ) // Warning since 1.5, error in 1.6, hidden in 1.7
82 public suspend fun <T> MaybeSource<T & Any>.await(): T? = awaitSingleOrNull()
83 
84 /**
85  * Awaits for completion of the maybe without blocking a thread.
86  * Returns the resulting value, [default] if no value was produced or throws the corresponding exception if this
87  * maybe had produced error.
88  *
89  * This suspending function is cancellable.
90  * If the [Job] of the current coroutine is cancelled while this suspending function is waiting, this function
91  * immediately resumes with [CancellationException].
92  *
93  * ### Deprecation
94  *
95  * Deprecated in favor of [awaitSingleOrNull] for naming consistency (see the deprecation of [MaybeSource.await] for
96  * details).
97  *
98  * @suppress
99  */
100 @Deprecated(
101     message = "Deprecated in favor of awaitSingleOrNull()",
102     level = DeprecationLevel.HIDDEN,
103     replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default")
104 ) // Warning since 1.5, error in 1.6, hidden in 1.7
105 public suspend fun <T> MaybeSource<T & Any>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default
106 
107 // ------------------------ SingleSource ------------------------
108 
109 /**
110  * Awaits for completion of the single value response without blocking the thread.
111  * Returns the resulting value, or throws the corresponding exception if this response produces an error.
112  *
113  * This suspending function is cancellable.
114  * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
115  * function immediately disposes of its subscription and resumes with [CancellationException].
116  */
117 public suspend fun <T> SingleSource<T & Any>.await(): T = suspendCancellableCoroutine { cont ->
118     subscribe(object : SingleObserver<T & Any> {
onSubscribenull119         override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
120         override fun onSuccess(t: T & Any) { cont.resume(t) }
onErrornull121         override fun onError(error: Throwable) { cont.resumeWithException(error) }
122     })
123 }
124 
125 // ------------------------ ObservableSource ------------------------
126 
127 /**
128  * Awaits the first value from the given [Observable] without blocking the thread and returns the resulting value, or,
129  * if the observable has produced an error, throws the corresponding exception.
130  *
131  * This suspending function is cancellable.
132  * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
133  * function immediately disposes of its subscription and resumes with [CancellationException].
134  *
135  * @throws NoSuchElementException if the observable does not emit any value
136  */
137 @Suppress("UNCHECKED_CAST")
138 public suspend fun <T> ObservableSource<T & Any>.awaitFirst(): T = awaitOne(Mode.FIRST) as T
139 
140 /**
141  * Awaits the first value from the given [Observable], or returns the [default] value if none is emitted, without
142  * blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the
143  * corresponding exception.
144  *
145  * This suspending function is cancellable.
146  * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
147  * function immediately disposes of its subscription and resumes with [CancellationException].
148  */
149 @Suppress("UNCHECKED_CAST")
150 public suspend fun <T> ObservableSource<T & Any>.awaitFirstOrDefault(default: T): T =
151     awaitOne(Mode.FIRST_OR_DEFAULT, default) as T
152 
153 /**
154  * Awaits the first value from the given [Observable], or returns `null` if none is emitted, without blocking the
155  * thread, and returns the resulting value, or, if this observable has produced an error, throws the corresponding
156  * exception.
157  *
158  * This suspending function is cancellable.
159  * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
160  * function immediately disposes of its subscription and resumes with [CancellationException].
161  */
162 public suspend fun <T> ObservableSource<T & Any>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
163 
164 /**
165  * Awaits the first value from the given [Observable], or calls [defaultValue] to get a value if none is emitted,
166  * without blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws
167  * the corresponding exception.
168  *
169  * This suspending function is cancellable.
170  * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
171  * function immediately disposes of its subscription and resumes with [CancellationException].
172  */
173 public suspend fun <T> ObservableSource<T & Any>.awaitFirstOrElse(defaultValue: () -> T): T =
174     awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
175 
176 /**
177  * Awaits the last value from the given [Observable] without blocking the thread and
178  * returns the resulting value, or, if this observable has produced an error, throws the corresponding exception.
179  *
180  * This suspending function is cancellable.
181  * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
182  * function immediately disposes of its subscription and resumes with [CancellationException].
183  *
184  * @throws NoSuchElementException if the observable does not emit any value
185  */
186 @Suppress("UNCHECKED_CAST")
187 public suspend fun <T> ObservableSource<T & Any>.awaitLast(): T = awaitOne(Mode.LAST) as T
188 
189 /**
190  * Awaits the single value from the given observable without blocking the thread and returns the resulting value, or,
191  * if this observable has produced an error, throws the corresponding exception.
192  *
193  * This suspending function is cancellable.
194  * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
195  * function immediately disposes of its subscription and resumes with [CancellationException].
196  *
197  * @throws NoSuchElementException if the observable does not emit any value
198  * @throws IllegalArgumentException if the observable emits more than one value
199  */
200 @Suppress("UNCHECKED_CAST")
201 public suspend fun <T> ObservableSource<T & Any>.awaitSingle(): T = awaitOne(Mode.SINGLE) as T
202 
203 // ------------------------ private ------------------------
204 
disposeOnCancellationnull205 internal fun CancellableContinuation<*>.disposeOnCancellation(d: Disposable) =
206     invokeOnCancellation { d.dispose() }
207 
208 private enum class Mode(@JvmField val s: String) {
209     FIRST("awaitFirst"),
210     FIRST_OR_DEFAULT("awaitFirstOrDefault"),
211     LAST("awaitLast"),
212     SINGLE("awaitSingle");
toStringnull213     override fun toString(): String = s
214 }
215 
216 private suspend fun <T> ObservableSource<T & Any>.awaitOne(
217     mode: Mode,
218     default: T? = null
219 ): T? = suspendCancellableCoroutine { cont ->
220     subscribe(object : Observer<T & Any> {
221         private lateinit var subscription: Disposable
222         private var value: T? = null
223         private var seenValue = false
224 
225         override fun onSubscribe(sub: Disposable) {
226             subscription = sub
227             cont.invokeOnCancellation { sub.dispose() }
228         }
229 
230         override fun onNext(t: T & Any) {
231             when (mode) {
232                 Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
233                     if (!seenValue) {
234                         seenValue = true
235                         cont.resume(t)
236                         subscription.dispose()
237                     }
238                 }
239                 Mode.LAST, Mode.SINGLE -> {
240                     if (mode == Mode.SINGLE && seenValue) {
241                         if (cont.isActive)
242                             cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
243                         subscription.dispose()
244                     } else {
245                         value = t
246                         seenValue = true
247                     }
248                 }
249             }
250         }
251 
252         @Suppress("UNCHECKED_CAST")
253         override fun onComplete() {
254             if (seenValue) {
255                 if (cont.isActive) cont.resume(value as T)
256                 return
257             }
258             when {
259                 mode == Mode.FIRST_OR_DEFAULT -> {
260                     cont.resume(default as T)
261                 }
262                 cont.isActive -> {
263                     cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
264                 }
265             }
266         }
267 
268         override fun onError(e: Throwable) {
269             cont.resumeWithException(e)
270         }
271     })
272 }
273 
274