<lambda>null1 package kotlinx.coroutines.rx3
2
3 import io.reactivex.rxjava3.core.*
4 import kotlinx.coroutines.*
5 import kotlin.coroutines.*
6
7 /**
8 * Creates cold [Completable] that runs a given [block] in a coroutine and emits its result.
9 * Every time the returned completable is subscribed, it starts a new coroutine.
10 * Unsubscribing cancels running coroutine.
11 * Coroutine context can be specified with [context] argument.
12 * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
13 * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
14 */
15 public fun rxCompletable(
16 context: CoroutineContext = EmptyCoroutineContext,
17 block: suspend CoroutineScope.() -> Unit
18 ): Completable {
19 require(context[Job] === null) { "Completable context cannot contain job in it." +
20 "Its lifecycle should be managed via Disposable handle. Had $context" }
21 return rxCompletableInternal(GlobalScope, context, block)
22 }
23
rxCompletableInternalnull24 private fun rxCompletableInternal(
25 scope: CoroutineScope, // support for legacy rxCompletable in scope
26 context: CoroutineContext,
27 block: suspend CoroutineScope.() -> Unit
28 ): Completable = Completable.create { subscriber ->
29 val newContext = scope.newCoroutineContext(context)
30 val coroutine = RxCompletableCoroutine(newContext, subscriber)
31 subscriber.setCancellable(RxCancellable(coroutine))
32 coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
33 }
34
35 private class RxCompletableCoroutine(
36 parentContext: CoroutineContext,
37 private val subscriber: CompletableEmitter
38 ) : AbstractCoroutine<Unit>(parentContext, false, true) {
onCompletednull39 override fun onCompleted(value: Unit) {
40 try {
41 subscriber.onComplete()
42 } catch (e: Throwable) {
43 handleUndeliverableException(e, context)
44 }
45 }
46
onCancellednull47 override fun onCancelled(cause: Throwable, handled: Boolean) {
48 try {
49 if (subscriber.tryOnError(cause)) {
50 return
51 }
52 } catch (e: Throwable) {
53 cause.addSuppressed(e)
54 }
55 handleUndeliverableException(cause, context)
56 }
57 }
58