• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx2
6 
7 import io.reactivex.*
8 import io.reactivex.exceptions.*
9 import kotlinx.atomicfu.*
10 import kotlinx.coroutines.*
11 import kotlinx.coroutines.channels.*
12 import kotlinx.coroutines.internal.*
13 import kotlinx.coroutines.selects.*
14 import kotlinx.coroutines.sync.*
15 import kotlin.coroutines.*
16 
17 /**
18  * Creates cold [observable][Observable] that will run a given [block] in a coroutine.
19  * Every time the returned observable is subscribed, it starts a new coroutine.
20  *
21  * Coroutine emits ([ObservableEmitter.onNext]) values with `send`, completes ([ObservableEmitter.onComplete])
22  * when the coroutine completes or channel is explicitly closed and emits error ([ObservableEmitter.onError])
23  * if coroutine throws an exception or closes channel with a cause.
24  * Unsubscribing cancels running coroutine.
25  *
26  * Invocations of `send` are suspended appropriately to ensure that `onNext` is not invoked concurrently.
27  * Note that Rx 2.x [Observable] **does not support backpressure**.
28  *
29  * Coroutine context can be specified with [context] argument.
30  * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
31  * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
32  */
33 public fun <T : Any> rxObservable(
34     context: CoroutineContext = EmptyCoroutineContext,
35     @BuilderInference block: suspend ProducerScope<T>.() -> Unit
36 ): Observable<T> {
37     require(context[Job] === null) { "Observable context cannot contain job in it." +
38             "Its lifecycle should be managed via Disposable handle. Had $context" }
39     return rxObservableInternal(GlobalScope, context, block)
40 }
41 
rxObservableInternalnull42 private fun <T : Any> rxObservableInternal(
43     scope: CoroutineScope, // support for legacy rxObservable in scope
44     context: CoroutineContext,
45     block: suspend ProducerScope<T>.() -> Unit
46 ): Observable<T> = Observable.create { subscriber ->
47     val newContext = scope.newCoroutineContext(context)
48     val coroutine = RxObservableCoroutine(newContext, subscriber)
49     subscriber.setCancellable(RxCancellable(coroutine)) // do it first (before starting coroutine), to await unnecessary suspensions
50     coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
51 }
52 
53 private const val OPEN = 0        // open channel, still working
54 private const val CLOSED = -1     // closed, but have not signalled onCompleted/onError yet
55 private const val SIGNALLED = -2  // already signalled subscriber onCompleted/onError
56 
57 private class RxObservableCoroutine<T : Any>(
58     parentContext: CoroutineContext,
59     private val subscriber: ObservableEmitter<T>
60 ) : AbstractCoroutine<Unit>(parentContext, false, true), ProducerScope<T> {
61     override val channel: SendChannel<T> get() = this
62 
63     private val _signal = atomic(OPEN)
64 
65     override val isClosedForSend: Boolean get() = !isActive
closenull66     override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause)
67     override fun invokeOnClose(handler: (Throwable?) -> Unit) =
68         throw UnsupportedOperationException("RxObservableCoroutine doesn't support invokeOnClose")
69 
70     // Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked
71     private val mutex: Mutex = Mutex()
72 
73     @Suppress("UNCHECKED_CAST", "INVISIBLE_MEMBER")
74     override val onSend: SelectClause2<T, SendChannel<T>> get() = SelectClause2Impl(
75         clauseObject = this,
76         regFunc = RxObservableCoroutine<*>::registerSelectForSend as RegistrationFunction,
77         processResFunc = RxObservableCoroutine<*>::processResultSelectSend as ProcessResultFunction
78     )
79 
80     @Suppress("UNUSED_PARAMETER")
81     private fun registerSelectForSend(select: SelectInstance<*>, element: Any?) {
82         // Try to acquire the mutex and complete in the registration phase.
83         if (mutex.tryLock()) {
84             select.selectInRegistrationPhase(Unit)
85             return
86         }
87         // Start a new coroutine that waits for the mutex, invoking `trySelect(..)` after that.
88         // Please note that at the point of the `trySelect(..)` invocation the corresponding
89         // `select` can still be in the registration phase, making this `trySelect(..)` bound to fail.
90         // In this case, the `onSend` clause will be re-registered, which alongside with the mutex
91         // manipulation makes the resulting solution obstruction-free.
92         launch {
93             mutex.lock()
94             if (!select.trySelect(this@RxObservableCoroutine, Unit)) {
95                 mutex.unlock()
96             }
97         }
98     }
99 
100     @Suppress("RedundantNullableReturnType", "UNUSED_PARAMETER", "UNCHECKED_CAST")
processResultSelectSendnull101     private fun processResultSelectSend(element: Any?, selectResult: Any?): Any? {
102         doLockedNext(element as T)?.let { throw it }
103         return this@RxObservableCoroutine
104     }
105 
trySendnull106     override fun trySend(element: T): ChannelResult<Unit> =
107         if (!mutex.tryLock()) {
108             ChannelResult.failure()
109         } else {
throwablenull110             when (val throwable = doLockedNext(element)) {
111                 null -> ChannelResult.success(Unit)
112                 else -> ChannelResult.closed(throwable)
113             }
114         }
115 
sendnull116     override suspend fun send(element: T) {
117         mutex.lock()
118         doLockedNext(element)?.let { throw it }
119     }
120 
121     // assert: mutex.isLocked()
doLockedNextnull122     private fun doLockedNext(elem: T): Throwable? {
123         // check if already closed for send
124         if (!isActive) {
125             doLockedSignalCompleted(completionCause, completionCauseHandled)
126             return getCancellationException()
127         }
128         // notify subscriber
129         try {
130             subscriber.onNext(elem)
131         } catch (e: Throwable) {
132             val cause = UndeliverableException(e)
133             val causeDelivered = close(cause)
134             unlockAndCheckCompleted()
135             return if (causeDelivered) {
136                 // `cause` is the reason this channel is closed
137                 cause
138             } else {
139                 // Someone else closed the channel during `onNext`. We report `cause` as an undeliverable exception.
140                 handleUndeliverableException(cause, context)
141                 getCancellationException()
142             }
143         }
144         /*
145          * There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might
146          * happen after this check and before `unlock` (see signalCompleted that does not do anything
147          * if it fails to acquire the lock that we are still holding).
148          * We have to recheck `isCompleted` after `unlock` anyway.
149          */
150         unlockAndCheckCompleted()
151         return null
152     }
153 
unlockAndCheckCompletednull154     private fun unlockAndCheckCompleted() {
155         mutex.unlock()
156         // recheck isActive
157         if (!isActive && mutex.tryLock())
158             doLockedSignalCompleted(completionCause, completionCauseHandled)
159     }
160 
161     // assert: mutex.isLocked()
doLockedSignalCompletednull162     private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) {
163         // cancellation failures
164         try {
165             if (_signal.value == SIGNALLED)
166                 return
167             _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
168             @Suppress("INVISIBLE_MEMBER")
169             val unwrappedCause = cause?.let { unwrap(it) }
170             if (unwrappedCause == null) {
171                 try {
172                     subscriber.onComplete()
173                 } catch (e: Exception) {
174                     handleUndeliverableException(e, context)
175                 }
176             } else if (unwrappedCause is UndeliverableException && !handled) {
177                 /** Such exceptions are not reported to `onError`, as, according to the reactive specifications,
178                  * exceptions thrown from the Subscriber methods must be treated as if the Subscriber was already
179                  * cancelled. */
180                 handleUndeliverableException(cause, context)
181             } else if (unwrappedCause !== getCancellationException() || !subscriber.isDisposed) {
182                 try {
183                     /** If the subscriber is already in a terminal state, the error will be signalled to
184                      * `RxJavaPlugins.onError`. */
185                     subscriber.onError(cause)
186                 } catch (e: Exception) {
187                     cause.addSuppressed(e)
188                     handleUndeliverableException(cause, context)
189                 }
190             }
191         } finally {
192             mutex.unlock()
193         }
194     }
195 
signalCompletednull196     private fun signalCompleted(cause: Throwable?, handled: Boolean) {
197         if (!_signal.compareAndSet(OPEN, CLOSED)) return // abort, other thread invoked doLockedSignalCompleted
198         if (mutex.tryLock()) // if we can acquire the lock
199             doLockedSignalCompleted(cause, handled)
200     }
201 
onCompletednull202     override fun onCompleted(value: Unit) {
203         signalCompleted(null, false)
204     }
205 
onCancellednull206     override fun onCancelled(cause: Throwable, handled: Boolean) {
207         signalCompleted(cause, handled)
208     }
209 }
210 
211 /** @suppress */
212 @Deprecated(
213     message = "CoroutineScope.rxObservable is deprecated in favour of top-level rxObservable",
214     level = DeprecationLevel.HIDDEN,
215     replaceWith = ReplaceWith("rxObservable(context, block)")
216 ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
rxObservablenull217 public fun <T : Any> CoroutineScope.rxObservable(
218     context: CoroutineContext = EmptyCoroutineContext,
219     @BuilderInference block: suspend ProducerScope<T>.() -> Unit
220 ): Observable<T> = rxObservableInternal(this, context, block)
221