• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.rx2
2 
3 import io.reactivex.*
4 import io.reactivex.disposables.Disposable
5 import kotlinx.coroutines.CancellableContinuation
6 import kotlinx.coroutines.CancellationException
7 import kotlinx.coroutines.Job
8 import kotlinx.coroutines.suspendCancellableCoroutine
9 import kotlin.coroutines.*
10 
11 // ------------------------ CompletableSource ------------------------
12 
13 /**
14  * Awaits for completion of this completable without blocking the thread.
15  * Returns `Unit`, or throws the corresponding exception if this completable produces an error.
16  *
17  * This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled while this
18  * suspending function is suspended, this function immediately resumes with [CancellationException] and disposes of its
19  * subscription.
20  */
21 public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont ->
22     subscribe(object : CompletableObserver {
23         override fun onSubscribe(d: Disposable) {
24             cont.disposeOnCancellation(d)
25         }
26 
27         override fun onComplete() {
28             cont.resume(Unit)
29         }
30 
31         override fun onError(e: Throwable) {
32             cont.resumeWithException(e)
33         }
34     })
35 }
36 
37 // ------------------------ MaybeSource ------------------------
38 
39 /**
40  * Awaits for completion of the [MaybeSource] without blocking the thread.
41  * Returns the resulting value, or `null` if no value is produced, or throws the corresponding exception if this
42  * [MaybeSource] produces an error.
43  *
44  * This suspending function is cancellable.
45  * If the [Job] of the current coroutine is cancelled while this suspending function is waiting, this
46  * function immediately resumes with [CancellationException] and disposes of its subscription.
47  */
awaitSingleOrNullnull48 public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
49     subscribe(object : MaybeObserver<T> {
50         override fun onSubscribe(d: Disposable) {
51             cont.disposeOnCancellation(d)
52         }
53 
54         override fun onComplete() {
55             cont.resume(null)
56         }
57 
58         override fun onSuccess(t: T & Any) {
59             cont.resume(t)
60         }
61 
62         override fun onError(error: Throwable) {
63             cont.resumeWithException(error)
64         }
65     })
66 }
67 
68 /**
69  * Awaits for completion of the [MaybeSource] without blocking the thread.
70  * Returns the resulting value, or throws if either no value is produced or this [MaybeSource] produces an error.
71  *
72  * This suspending function is cancellable.
73  * If the [Job] of the current coroutine is cancelled while this suspending function is waiting, this
74  * function immediately resumes with [CancellationException] and disposes of its subscription.
75  *
76  * @throws NoSuchElementException if no elements were produced by this [MaybeSource].
77  */
awaitSinglenull78 public suspend fun <T> MaybeSource<T>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException()
79 
80 /**
81  * Awaits for completion of the maybe without blocking a thread.
82  * Returns the resulting value, null if no value was produced or throws the corresponding exception if this
83  * maybe had produced error.
84  *
85  * This suspending function is cancellable.
86  * If the [Job] of the current coroutine is cancelled while this suspending function is waiting, this function
87  * immediately resumes with [CancellationException].
88  *
89  * ### Deprecation
90  *
91  * Deprecated in favor of [awaitSingleOrNull] in order to reflect that `null` can be returned to denote the absence of
92  * a value, as opposed to throwing in such case.
93  * @suppress
94  */
95 @Deprecated(
96     message = "Deprecated in favor of awaitSingleOrNull()",
97     level = DeprecationLevel.HIDDEN,
98     replaceWith = ReplaceWith("this.awaitSingleOrNull()")
99 ) // Warning since 1.5, error in 1.6, hidden in 1.7
100 public suspend fun <T> MaybeSource<T>.await(): T? = awaitSingleOrNull()
101 
102 /**
103  * Awaits for completion of the maybe without blocking a thread.
104  * Returns the resulting value, [default] if no value was produced or throws the corresponding exception if this
105  * maybe had produced error.
106  *
107  * This suspending function is cancellable.
108  * If the [Job] of the current coroutine is cancelled while this suspending function is waiting, this function
109  * immediately resumes with [CancellationException].
110  *
111  * ### Deprecation
112  *
113  * Deprecated in favor of [awaitSingleOrNull] for naming consistency (see the deprecation of [MaybeSource.await] for
114  * details).
115  * @suppress
116  */
117 @Deprecated(
118     message = "Deprecated in favor of awaitSingleOrNull()",
119     level = DeprecationLevel.HIDDEN,
120     replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default")
121 ) // Warning since 1.5, error in 1.6, hidden in 1.7
122 public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default
123 
124 // ------------------------ SingleSource ------------------------
125 
126 /**
127  * Awaits for completion of the single value response without blocking the thread.
128  * Returns the resulting value, or throws the corresponding exception if this response produces an error.
129  *
130  * This suspending function is cancellable.
131  * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
132  * function immediately disposes of its subscription and resumes with [CancellationException].
133  */
134 public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
135     subscribe(object : SingleObserver<T> {
136         override fun onSubscribe(d: Disposable) {
137             cont.disposeOnCancellation(d)
138         }
139 
140         override fun onSuccess(t: T & Any) {
141             cont.resume(t)
142         }
143 
144         override fun onError(error: Throwable) {
145             cont.resumeWithException(error)
146         }
147     })
148 }
149 
150 // ------------------------ ObservableSource ------------------------
151 
152 /**
153  * Awaits the first value from the given [Observable] without blocking the thread and returns the resulting value, or,
154  * if the observable has produced an error, throws the corresponding exception.
155  *
156  * This suspending function is cancellable.
157  * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
158  * function immediately disposes of its subscription and resumes with [CancellationException].
159  *
160  * @throws NoSuchElementException if the observable does not emit any value
161  */
awaitFirstnull162 public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
163 
164 /**
165  * Awaits the first value from the given [Observable], or returns the [default] value if none is emitted, without
166  * blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the
167  * corresponding exception.
168  *
169  * This suspending function is cancellable.
170  * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
171  * function immediately disposes of its subscription and resumes with [CancellationException].
172  */
173 public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
174 
175 /**
176  * Awaits the first value from the given [Observable], or returns `null` if none is emitted, without blocking the
177  * thread, and returns the resulting value, or, if this observable has produced an error, throws the corresponding
178  * exception.
179  *
180  * This suspending function is cancellable.
181  * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
182  * function immediately disposes of its subscription and resumes with [CancellationException].
183  */
184 public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
185 
186 /**
187  * Awaits the first value from the given [Observable], or calls [defaultValue] to get a value if none is emitted,
188  * without blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws
189  * the corresponding exception.
190  *
191  * This suspending function is cancellable.
192  * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
193  * function immediately disposes of its subscription and resumes with [CancellationException].
194  */
195 public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T =
196     awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
197 
198 /**
199  * Awaits the last value from the given [Observable] without blocking the thread and
200  * returns the resulting value, or, if this observable has produced an error, throws the corresponding exception.
201  *
202  * This suspending function is cancellable.
203  * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
204  * function immediately disposes of its subscription and resumes with [CancellationException].
205  *
206  * @throws NoSuchElementException if the observable does not emit any value
207  */
208 public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST)
209 
210 /**
211  * Awaits the single value from the given observable without blocking the thread and returns the resulting value, or,
212  * if this observable has produced an error, throws the corresponding exception.
213  *
214  * This suspending function is cancellable.
215  * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
216  * function immediately disposes of its subscription and resumes with [CancellationException].
217  *
218  * @throws NoSuchElementException if the observable does not emit any value
219  * @throws IllegalArgumentException if the observable emits more than one value
220  */
221 public suspend fun <T> ObservableSource<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
222 
223 // ------------------------ private ------------------------
224 
225 internal fun CancellableContinuation<*>.disposeOnCancellation(d: Disposable) =
226     invokeOnCancellation { d.dispose() }
227 
228 private enum class Mode(val s: String) {
229     FIRST("awaitFirst"),
230     FIRST_OR_DEFAULT("awaitFirstOrDefault"),
231     LAST("awaitLast"),
232     SINGLE("awaitSingle");
toStringnull233     override fun toString(): String = s
234 }
235 
236 private suspend fun <T> ObservableSource<T>.awaitOne(
237     mode: Mode,
238     default: T? = null
239 ): T = suspendCancellableCoroutine { cont ->
240     subscribe(object : Observer<T> {
241         private lateinit var subscription: Disposable
242         private var value: T? = null
243         private var seenValue = false
244 
245         override fun onSubscribe(sub: Disposable) {
246             subscription = sub
247             cont.invokeOnCancellation { sub.dispose() }
248         }
249 
250         override fun onNext(t: T & Any) {
251             when (mode) {
252                 Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
253                     if (!seenValue) {
254                         seenValue = true
255                         cont.resume(t)
256                         subscription.dispose()
257                     }
258                 }
259                 Mode.LAST, Mode.SINGLE -> {
260                     if (mode == Mode.SINGLE && seenValue) {
261                         if (cont.isActive)
262                             cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
263                         subscription.dispose()
264                     } else {
265                         value = t
266                         seenValue = true
267                     }
268                 }
269             }
270         }
271 
272         @Suppress("UNCHECKED_CAST")
273         override fun onComplete() {
274             if (seenValue) {
275                 if (cont.isActive) cont.resume(value as T)
276                 return
277             }
278             when {
279                 mode == Mode.FIRST_OR_DEFAULT -> {
280                     cont.resume(default as T)
281                 }
282                 cont.isActive -> {
283                     cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
284                 }
285             }
286         }
287 
288         override fun onError(e: Throwable) {
289             cont.resumeWithException(e)
290         }
291     })
292 }
293 
294