• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx2
6 
7 import io.reactivex.*
8 import kotlinx.coroutines.*
9 import kotlin.coroutines.*
10 
11 /**
12  * Creates cold [single][Single] that will run a given [block] in a coroutine and emits its result.
13  * Every time the returned observable is subscribed, it starts a new coroutine.
14  * Unsubscribing cancels running coroutine.
15  * Coroutine context can be specified with [context] argument.
16  * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
17  * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
18  */
19 public fun <T : Any> rxSingle(
20     context: CoroutineContext = EmptyCoroutineContext,
21     block: suspend CoroutineScope.() -> T
22 ): Single<T> {
23     require(context[Job] === null) { "Single context cannot contain job in it." +
24             "Its lifecycle should be managed via Disposable handle. Had $context" }
25     return rxSingleInternal(GlobalScope, context, block)
26 }
27 
rxSingleInternalnull28 private fun <T : Any> rxSingleInternal(
29     scope: CoroutineScope, // support for legacy rxSingle in scope
30     context: CoroutineContext,
31     block: suspend CoroutineScope.() -> T
32 ): Single<T> = Single.create { subscriber ->
33     val newContext = scope.newCoroutineContext(context)
34     val coroutine = RxSingleCoroutine(newContext, subscriber)
35     subscriber.setCancellable(RxCancellable(coroutine))
36     coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
37 }
38 
39 private class RxSingleCoroutine<T: Any>(
40     parentContext: CoroutineContext,
41     private val subscriber: SingleEmitter<T>
42 ) : AbstractCoroutine<T>(parentContext, false, true) {
onCompletednull43     override fun onCompleted(value: T) {
44         try {
45             subscriber.onSuccess(value)
46         } catch (e: Throwable) {
47             handleUndeliverableException(e, context)
48         }
49     }
50 
onCancellednull51     override fun onCancelled(cause: Throwable, handled: Boolean) {
52         try {
53             if (subscriber.tryOnError(cause)) {
54                 return
55             }
56         } catch (e: Throwable) {
57             cause.addSuppressed(e)
58         }
59         handleUndeliverableException(cause, context)
60     }
61 }
62 
63 /** @suppress */
64 @Deprecated(
65     message = "CoroutineScope.rxSingle is deprecated in favour of top-level rxSingle",
66     level = DeprecationLevel.HIDDEN,
67     replaceWith = ReplaceWith("rxSingle(context, block)")
68 ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
rxSinglenull69 public fun <T : Any> CoroutineScope.rxSingle(
70     context: CoroutineContext = EmptyCoroutineContext,
71     block: suspend CoroutineScope.() -> T
72 ): Single<T> = rxSingleInternal(this, context, block)
73