• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package kotlinx.coroutines.rx2
2 
3 import io.reactivex.*
4 import io.reactivex.disposables.*
5 import kotlinx.atomicfu.*
6 import kotlinx.coroutines.channels.*
7 import kotlinx.coroutines.flow.*
8 import kotlinx.coroutines.reactive.*
9 
10 /**
11  * Subscribes to this [MaybeSource] and performs the specified action for each received element.
12  *
13  * If [action] throws an exception at some point or if the [MaybeSource] raises an error, the exception is rethrown from
14  * [collect].
15  */
collectnull16 public suspend inline fun <T> MaybeSource<T>.collect(action: (T) -> Unit): Unit =
17     toChannel().consumeEach(action)
18 
19 /**
20  * Subscribes to this [ObservableSource] and performs the specified action for each received element.
21  *
22  * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
23  * [collect]. Also, if the [ObservableSource] signals an error, that error is rethrown from [collect].
24  */
25 public suspend inline fun <T> ObservableSource<T>.collect(action: (T) -> Unit): Unit =
26     toChannel().consumeEach(action)
27 
28 @PublishedApi
29 internal fun <T> MaybeSource<T>.toChannel(): ReceiveChannel<T> {
30     val channel = SubscriptionChannel<T>()
31     subscribe(channel)
32     return channel
33 }
34 
35 @PublishedApi
toChannelnull36 internal fun <T> ObservableSource<T>.toChannel(): ReceiveChannel<T> {
37     val channel = SubscriptionChannel<T>()
38     subscribe(channel)
39     return channel
40 }
41 
42 @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
43 private class SubscriptionChannel<T> :
44     BufferedChannel<T>(capacity = Channel.UNLIMITED), Observer<T>, MaybeObserver<T>
45 {
46     private val _subscription = atomic<Disposable?>(null)
47 
48     @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
onClosedIdempotentnull49     override fun onClosedIdempotent() {
50         _subscription.getAndSet(null)?.dispose() // dispose exactly once
51     }
52 
53     // Observer overrider
onSubscribenull54     override fun onSubscribe(sub: Disposable) {
55         _subscription.value = sub
56     }
57 
58     override fun onSuccess(t: T & Any) {
59         trySend(t)
60         close(cause = null)
61     }
62 
63     override fun onNext(t: T & Any) {
64         trySend(t) // Safe to ignore return value here, expectedly racing with cancellation
65     }
66 
onCompletenull67     override fun onComplete() {
68         close(cause = null)
69     }
70 
onErrornull71     override fun onError(e: Throwable) {
72         close(cause = e)
73     }
74 }
75 
76 /** @suppress */
77 @Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.HIDDEN) // ERROR in 1.4.0, HIDDEN in 1.6.0
78 public fun <T> ObservableSource<T & Any>.openSubscription(): ReceiveChannel<T> {
79     val channel = SubscriptionChannel<T>()
80     subscribe(channel)
81     return channel
82 }
83 
84 /** @suppress */
85 @Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.HIDDEN) // ERROR in 1.4.0, HIDDEN in 1.6.0
86 public fun <T> MaybeSource<T & Any>.openSubscription(): ReceiveChannel<T> {
87     val channel = SubscriptionChannel<T>()
88     subscribe(channel)
89     return channel
90 }
91