1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.rx2
6
7 import io.reactivex.*
8 import kotlinx.coroutines.*
9 import kotlin.coroutines.*
10
11 /**
12 * Creates cold [maybe][Maybe] that will run a given [block] in a coroutine and emits its result.
13 * If [block] result is `null`, [onComplete][MaybeObserver.onComplete] is invoked without a value.
14 * Every time the returned observable is subscribed, it starts a new coroutine.
15 * Unsubscribing cancels running coroutine.
16 * Coroutine context can be specified with [context] argument.
17 * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
18 * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
19 */
20 public fun <T> rxMaybe(
21 context: CoroutineContext = EmptyCoroutineContext,
22 block: suspend CoroutineScope.() -> T?
23 ): Maybe<T> {
24 require(context[Job] === null) { "Maybe context cannot contain job in it." +
25 "Its lifecycle should be managed via Disposable handle. Had $context" }
26 return rxMaybeInternal(GlobalScope, context, block)
27 }
28
rxMaybeInternalnull29 private fun <T> rxMaybeInternal(
30 scope: CoroutineScope, // support for legacy rxMaybe in scope
31 context: CoroutineContext,
32 block: suspend CoroutineScope.() -> T?
33 ): Maybe<T> = Maybe.create { subscriber ->
34 val newContext = scope.newCoroutineContext(context)
35 val coroutine = RxMaybeCoroutine(newContext, subscriber)
36 subscriber.setCancellable(RxCancellable(coroutine))
37 coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
38 }
39
40 private class RxMaybeCoroutine<T>(
41 parentContext: CoroutineContext,
42 private val subscriber: MaybeEmitter<T>
43 ) : AbstractCoroutine<T>(parentContext, false, true) {
onCompletednull44 override fun onCompleted(value: T) {
45 try {
46 if (value == null) subscriber.onComplete() else subscriber.onSuccess(value)
47 } catch (e: Throwable) {
48 handleUndeliverableException(e, context)
49 }
50 }
51
onCancellednull52 override fun onCancelled(cause: Throwable, handled: Boolean) {
53 try {
54 if (subscriber.tryOnError(cause)) {
55 return
56 }
57 } catch (e: Throwable) {
58 cause.addSuppressed(e)
59 }
60 handleUndeliverableException(cause, context)
61 }
62 }
63
64 /** @suppress */
65 @Deprecated(
66 message = "CoroutineScope.rxMaybe is deprecated in favour of top-level rxMaybe",
67 level = DeprecationLevel.HIDDEN,
68 replaceWith = ReplaceWith("rxMaybe(context, block)")
69 ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
rxMaybenull70 public fun <T> CoroutineScope.rxMaybe(
71 context: CoroutineContext = EmptyCoroutineContext,
72 block: suspend CoroutineScope.() -> T?
73 ): Maybe<T> = rxMaybeInternal(this, context, block)
74