1 package kotlinx.coroutines.rx3 2 3 import io.reactivex.rxjava3.core.* 4 import io.reactivex.rxjava3.disposables.* 5 import kotlinx.atomicfu.* 6 import kotlinx.coroutines.channels.* 7 import kotlinx.coroutines.flow.* 8 9 /** 10 * Subscribes to this [MaybeSource] and returns a channel to receive elements emitted by it. 11 * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this source. 12 * 13 * This API is internal in the favour of [Flow]. 14 * [MaybeSource] doesn't have a corresponding [Flow] adapter, so it should be transformed to [Observable] first. 15 */ 16 @PublishedApi 17 internal fun <T> MaybeSource<T & Any>.openSubscription(): ReceiveChannel<T> { 18 val channel = SubscriptionChannel<T>() 19 subscribe(channel) 20 return channel 21 } 22 23 /** 24 * Subscribes to this [ObservableSource] and returns a channel to receive elements emitted by it. 25 * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this source. 26 * 27 * This API is internal in the favour of [Flow]. 28 * [ObservableSource] doesn't have a corresponding [Flow] adapter, so it should be transformed to [Observable] first. 29 */ 30 @PublishedApi 31 internal fun <T> ObservableSource<T & Any>.openSubscription(): ReceiveChannel<T> { 32 val channel = SubscriptionChannel<T>() 33 subscribe(channel) 34 return channel 35 } 36 37 /** 38 * Subscribes to this [MaybeSource] and performs the specified action for each received element. 39 * 40 * If [action] throws an exception at some point or if the [MaybeSource] raises an error, the exception is rethrown from 41 * [collect]. 42 */ 43 public suspend inline fun <T> MaybeSource<T & Any>.collect(action: (T) -> Unit): Unit = 44 openSubscription().consumeEach(action) 45 46 /** 47 * Subscribes to this [ObservableSource] and performs the specified action for each received element. 48 * 49 * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from 50 * [collect]. Also, if the [ObservableSource] signals an error, that error is rethrown from [collect]. 51 */ 52 public suspend inline fun <T> ObservableSource<T & Any>.collect(action: (T) -> Unit): Unit = openSubscription().consumeEach(action) 53 54 @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") 55 private class SubscriptionChannel<T> : 56 BufferedChannel<T>(capacity = Channel.UNLIMITED), Observer<T & Any>, MaybeObserver<T & Any> 57 { 58 private val _subscription = atomic<Disposable?>(null) 59 60 @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER") onClosedIdempotentnull61 override fun onClosedIdempotent() { 62 _subscription.getAndSet(null)?.dispose() // dispose exactly once 63 } 64 65 // Observer overrider onSubscribenull66 override fun onSubscribe(sub: Disposable) { 67 _subscription.value = sub 68 } 69 70 override fun onSuccess(t: T & Any) { 71 trySend(t) 72 close(cause = null) 73 } 74 75 override fun onNext(t: T & Any) { 76 trySend(t) // Safe to ignore return value here, expectedly racing with cancellation 77 } 78 onCompletenull79 override fun onComplete() { 80 close(cause = null) 81 } 82 onErrornull83 override fun onError(e: Throwable) { 84 close(cause = e) 85 } 86 } 87