1 package kotlinx.coroutines.rx2
2
3 import io.reactivex.*
4 import io.reactivex.disposables.*
5 import kotlinx.atomicfu.*
6 import kotlinx.coroutines.channels.*
7 import kotlinx.coroutines.flow.*
8 import kotlinx.coroutines.reactive.*
9
10 /**
11 * Subscribes to this [MaybeSource] and performs the specified action for each received element.
12 *
13 * If [action] throws an exception at some point or if the [MaybeSource] raises an error, the exception is rethrown from
14 * [collect].
15 */
collectnull16 public suspend inline fun <T> MaybeSource<T>.collect(action: (T) -> Unit): Unit =
17 toChannel().consumeEach(action)
18
19 /**
20 * Subscribes to this [ObservableSource] and performs the specified action for each received element.
21 *
22 * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
23 * [collect]. Also, if the [ObservableSource] signals an error, that error is rethrown from [collect].
24 */
25 public suspend inline fun <T> ObservableSource<T>.collect(action: (T) -> Unit): Unit =
26 toChannel().consumeEach(action)
27
28 @PublishedApi
29 internal fun <T> MaybeSource<T>.toChannel(): ReceiveChannel<T> {
30 val channel = SubscriptionChannel<T>()
31 subscribe(channel)
32 return channel
33 }
34
35 @PublishedApi
toChannelnull36 internal fun <T> ObservableSource<T>.toChannel(): ReceiveChannel<T> {
37 val channel = SubscriptionChannel<T>()
38 subscribe(channel)
39 return channel
40 }
41
42 @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
43 private class SubscriptionChannel<T> :
44 BufferedChannel<T>(capacity = Channel.UNLIMITED), Observer<T>, MaybeObserver<T>
45 {
46 private val _subscription = atomic<Disposable?>(null)
47
48 @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
onClosedIdempotentnull49 override fun onClosedIdempotent() {
50 _subscription.getAndSet(null)?.dispose() // dispose exactly once
51 }
52
53 // Observer overrider
onSubscribenull54 override fun onSubscribe(sub: Disposable) {
55 _subscription.value = sub
56 }
57
58 override fun onSuccess(t: T & Any) {
59 trySend(t)
60 close(cause = null)
61 }
62
63 override fun onNext(t: T & Any) {
64 trySend(t) // Safe to ignore return value here, expectedly racing with cancellation
65 }
66
onCompletenull67 override fun onComplete() {
68 close(cause = null)
69 }
70
onErrornull71 override fun onError(e: Throwable) {
72 close(cause = e)
73 }
74 }
75
76 /** @suppress */
77 @Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.HIDDEN) // ERROR in 1.4.0, HIDDEN in 1.6.0
78 public fun <T> ObservableSource<T & Any>.openSubscription(): ReceiveChannel<T> {
79 val channel = SubscriptionChannel<T>()
80 subscribe(channel)
81 return channel
82 }
83
84 /** @suppress */
85 @Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.HIDDEN) // ERROR in 1.4.0, HIDDEN in 1.6.0
86 public fun <T> MaybeSource<T & Any>.openSubscription(): ReceiveChannel<T> {
87 val channel = SubscriptionChannel<T>()
88 subscribe(channel)
89 return channel
90 }
91