/* * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.rx2 import io.reactivex.* import io.reactivex.disposables.* import kotlinx.atomicfu.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.* /** * Subscribes to this [MaybeSource] and performs the specified action for each received element. * * If [action] throws an exception at some point or if the [MaybeSource] raises an error, the exception is rethrown from * [collect]. */ public suspend inline fun MaybeSource.collect(action: (T) -> Unit): Unit = toChannel().consumeEach(action) /** * Subscribes to this [ObservableSource] and performs the specified action for each received element. * * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from * [collect]. Also, if the [ObservableSource] signals an error, that error is rethrown from [collect]. */ public suspend inline fun ObservableSource.collect(action: (T) -> Unit): Unit = toChannel().consumeEach(action) @PublishedApi internal fun MaybeSource.toChannel(): ReceiveChannel { val channel = SubscriptionChannel() subscribe(channel) return channel } @PublishedApi internal fun ObservableSource.toChannel(): ReceiveChannel { val channel = SubscriptionChannel() subscribe(channel) return channel } @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") private class SubscriptionChannel : BufferedChannel(capacity = Channel.UNLIMITED), Observer, MaybeObserver { private val _subscription = atomic(null) @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER") override fun onClosedIdempotent() { _subscription.getAndSet(null)?.dispose() // dispose exactly once } // Observer overrider override fun onSubscribe(sub: Disposable) { _subscription.value = sub } override fun onSuccess(t: T & Any) { trySend(t) close(cause = null) } override fun onNext(t: T & Any) { trySend(t) // Safe to ignore return value here, expectedly racing with cancellation } override fun onComplete() { close(cause = null) } override fun onError(e: Throwable) { close(cause = e) } } /** @suppress */ @Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.HIDDEN) // ERROR in 1.4.0, HIDDEN in 1.6.0 public fun ObservableSource.openSubscription(): ReceiveChannel { val channel = SubscriptionChannel() subscribe(channel) return channel } /** @suppress */ @Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.HIDDEN) // ERROR in 1.4.0, HIDDEN in 1.6.0 public fun MaybeSource.openSubscription(): ReceiveChannel { val channel = SubscriptionChannel() subscribe(channel) return channel }