• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package kotlinx.coroutines.rx3
2 
3 import io.reactivex.rxjava3.core.*
4 import io.reactivex.rxjava3.disposables.*
5 import kotlinx.atomicfu.*
6 import kotlinx.coroutines.channels.*
7 import kotlinx.coroutines.flow.*
8 
9 /**
10  * Subscribes to this [MaybeSource] and returns a channel to receive elements emitted by it.
11  * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this source.
12  *
13  * This API is internal in the favour of [Flow].
14  * [MaybeSource] doesn't have a corresponding [Flow] adapter, so it should be transformed to [Observable] first.
15  */
16 @PublishedApi
17 internal fun <T> MaybeSource<T & Any>.openSubscription(): ReceiveChannel<T> {
18     val channel = SubscriptionChannel<T>()
19     subscribe(channel)
20     return channel
21 }
22 
23 /**
24  * Subscribes to this [ObservableSource] and returns a channel to receive elements emitted by it.
25  * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this source.
26  *
27  * This API is internal in the favour of [Flow].
28  * [ObservableSource] doesn't have a corresponding [Flow] adapter, so it should be transformed to [Observable] first.
29  */
30 @PublishedApi
31 internal fun <T> ObservableSource<T & Any>.openSubscription(): ReceiveChannel<T> {
32     val channel = SubscriptionChannel<T>()
33     subscribe(channel)
34     return channel
35 }
36 
37 /**
38  * Subscribes to this [MaybeSource] and performs the specified action for each received element.
39  *
40  * If [action] throws an exception at some point or if the [MaybeSource] raises an error, the exception is rethrown from
41  * [collect].
42  */
43 public suspend inline fun <T> MaybeSource<T & Any>.collect(action: (T) -> Unit): Unit =
44     openSubscription().consumeEach(action)
45 
46 /**
47  * Subscribes to this [ObservableSource] and performs the specified action for each received element.
48  *
49  * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
50  * [collect]. Also, if the [ObservableSource] signals an error, that error is rethrown from [collect].
51  */
52 public suspend inline fun <T> ObservableSource<T & Any>.collect(action: (T) -> Unit): Unit = openSubscription().consumeEach(action)
53 
54 @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
55 private class SubscriptionChannel<T> :
56     BufferedChannel<T>(capacity = Channel.UNLIMITED), Observer<T & Any>, MaybeObserver<T & Any>
57 {
58     private val _subscription = atomic<Disposable?>(null)
59 
60     @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
onClosedIdempotentnull61     override fun onClosedIdempotent() {
62         _subscription.getAndSet(null)?.dispose() // dispose exactly once
63     }
64 
65     // Observer overrider
onSubscribenull66     override fun onSubscribe(sub: Disposable) {
67         _subscription.value = sub
68     }
69 
70     override fun onSuccess(t: T & Any) {
71         trySend(t)
72         close(cause = null)
73     }
74 
75     override fun onNext(t: T & Any) {
76         trySend(t) // Safe to ignore return value here, expectedly racing with cancellation
77     }
78 
onCompletenull79     override fun onComplete() {
80         close(cause = null)
81     }
82 
onErrornull83     override fun onError(e: Throwable) {
84         close(cause = e)
85     }
86 }
87