• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.rx2
2 
3 import io.reactivex.*
4 import kotlinx.coroutines.*
5 import kotlin.coroutines.*
6 
7 /**
8  * Creates cold [single][Single] that will run a given [block] in a coroutine and emits its result.
9  * Every time the returned observable is subscribed, it starts a new coroutine.
10  * Unsubscribing cancels running coroutine.
11  * Coroutine context can be specified with [context] argument.
12  * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
13  * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
14  */
15 public fun <T : Any> rxSingle(
16     context: CoroutineContext = EmptyCoroutineContext,
17     block: suspend CoroutineScope.() -> T
18 ): Single<T> {
19     require(context[Job] === null) { "Single context cannot contain job in it." +
20             "Its lifecycle should be managed via Disposable handle. Had $context" }
21     return rxSingleInternal(GlobalScope, context, block)
22 }
23 
rxSingleInternalnull24 private fun <T : Any> rxSingleInternal(
25     scope: CoroutineScope, // support for legacy rxSingle in scope
26     context: CoroutineContext,
27     block: suspend CoroutineScope.() -> T
28 ): Single<T> = Single.create { subscriber ->
29     val newContext = scope.newCoroutineContext(context)
30     val coroutine = RxSingleCoroutine(newContext, subscriber)
31     subscriber.setCancellable(RxCancellable(coroutine))
32     coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
33 }
34 
35 private class RxSingleCoroutine<T: Any>(
36     parentContext: CoroutineContext,
37     private val subscriber: SingleEmitter<T>
38 ) : AbstractCoroutine<T>(parentContext, false, true) {
onCompletednull39     override fun onCompleted(value: T) {
40         try {
41             subscriber.onSuccess(value)
42         } catch (e: Throwable) {
43             handleUndeliverableException(e, context)
44         }
45     }
46 
onCancellednull47     override fun onCancelled(cause: Throwable, handled: Boolean) {
48         try {
49             if (subscriber.tryOnError(cause)) {
50                 return
51             }
52         } catch (e: Throwable) {
53             cause.addSuppressed(e)
54         }
55         handleUndeliverableException(cause, context)
56     }
57 }
58 
59 /** @suppress */
60 @Deprecated(
61     message = "CoroutineScope.rxSingle is deprecated in favour of top-level rxSingle",
62     level = DeprecationLevel.HIDDEN,
63     replaceWith = ReplaceWith("rxSingle(context, block)")
64 ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
rxSinglenull65 public fun <T : Any> CoroutineScope.rxSingle(
66     context: CoroutineContext = EmptyCoroutineContext,
67     block: suspend CoroutineScope.() -> T
68 ): Single<T> = rxSingleInternal(this, context, block)
69