/* * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.rx3 import io.reactivex.rxjava3.core.* import io.reactivex.rxjava3.disposables.* import kotlinx.atomicfu.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.internal.* /** * Subscribes to this [MaybeSource] and returns a channel to receive elements emitted by it. * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this source. * * This API is internal in the favour of [Flow]. * [MaybeSource] doesn't have a corresponding [Flow] adapter, so it should be transformed to [Observable] first. */ @PublishedApi internal fun MaybeSource.openSubscription(): ReceiveChannel { val channel = SubscriptionChannel() subscribe(channel) return channel } /** * Subscribes to this [ObservableSource] and returns a channel to receive elements emitted by it. * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this source. * * This API is internal in the favour of [Flow]. * [ObservableSource] doesn't have a corresponding [Flow] adapter, so it should be transformed to [Observable] first. */ @PublishedApi internal fun ObservableSource.openSubscription(): ReceiveChannel { val channel = SubscriptionChannel() subscribe(channel) return channel } /** * Subscribes to this [MaybeSource] and performs the specified action for each received element. * Cancels subscription if any exception happens during collect. */ public suspend inline fun MaybeSource.collect(action: (T) -> Unit): Unit = openSubscription().consumeEach(action) /** * Subscribes to this [ObservableSource] and performs the specified action for each received element. * Cancels subscription if any exception happens during collect. */ public suspend inline fun ObservableSource.collect(action: (T) -> Unit): Unit = openSubscription().consumeEach(action) @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") private class SubscriptionChannel : LinkedListChannel(null), Observer, MaybeObserver { private val _subscription = atomic(null) @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER") override fun onClosedIdempotent(closed: LockFreeLinkedListNode) { _subscription.getAndSet(null)?.dispose() // dispose exactly once } // Observer overrider override fun onSubscribe(sub: Disposable) { _subscription.value = sub } override fun onSuccess(t: T) { offer(t) } override fun onNext(t: T) { offer(t) } override fun onComplete() { close(cause = null) } override fun onError(e: Throwable) { close(cause = e) } }