1 /* 2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx3 6 7 import io.reactivex.rxjava3.core.* 8 import io.reactivex.rxjava3.disposables.* 9 import kotlinx.atomicfu.* 10 import kotlinx.coroutines.channels.* 11 import kotlinx.coroutines.flow.* 12 13 /** 14 * Subscribes to this [MaybeSource] and returns a channel to receive elements emitted by it. 15 * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this source. 16 * 17 * This API is internal in the favour of [Flow]. 18 * [MaybeSource] doesn't have a corresponding [Flow] adapter, so it should be transformed to [Observable] first. 19 */ 20 @PublishedApi 21 internal fun <T> MaybeSource<T & Any>.openSubscription(): ReceiveChannel<T> { 22 val channel = SubscriptionChannel<T>() 23 subscribe(channel) 24 return channel 25 } 26 27 /** 28 * Subscribes to this [ObservableSource] and returns a channel to receive elements emitted by it. 29 * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this source. 30 * 31 * This API is internal in the favour of [Flow]. 32 * [ObservableSource] doesn't have a corresponding [Flow] adapter, so it should be transformed to [Observable] first. 33 */ 34 @PublishedApi 35 internal fun <T> ObservableSource<T & Any>.openSubscription(): ReceiveChannel<T> { 36 val channel = SubscriptionChannel<T>() 37 subscribe(channel) 38 return channel 39 } 40 41 /** 42 * Subscribes to this [MaybeSource] and performs the specified action for each received element. 43 * 44 * If [action] throws an exception at some point or if the [MaybeSource] raises an error, the exception is rethrown from 45 * [collect]. 46 */ 47 public suspend inline fun <T> MaybeSource<T & Any>.collect(action: (T) -> Unit): Unit = 48 openSubscription().consumeEach(action) 49 50 /** 51 * Subscribes to this [ObservableSource] and performs the specified action for each received element. 52 * 53 * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from 54 * [collect]. Also, if the [ObservableSource] signals an error, that error is rethrown from [collect]. 55 */ 56 public suspend inline fun <T> ObservableSource<T & Any>.collect(action: (T) -> Unit): Unit = openSubscription().consumeEach(action) 57 58 @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") 59 private class SubscriptionChannel<T> : 60 BufferedChannel<T>(capacity = Channel.UNLIMITED), Observer<T & Any>, MaybeObserver<T & Any> 61 { 62 private val _subscription = atomic<Disposable?>(null) 63 64 @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER") onClosedIdempotentnull65 override fun onClosedIdempotent() { 66 _subscription.getAndSet(null)?.dispose() // dispose exactly once 67 } 68 69 // Observer overrider onSubscribenull70 override fun onSubscribe(sub: Disposable) { 71 _subscription.value = sub 72 } 73 74 override fun onSuccess(t: T & Any) { 75 trySend(t) 76 close(cause = null) 77 } 78 79 override fun onNext(t: T & Any) { 80 trySend(t) // Safe to ignore return value here, expectedly racing with cancellation 81 } 82 onCompletenull83 override fun onComplete() { 84 close(cause = null) 85 } 86 onErrornull87 override fun onError(e: Throwable) { 88 close(cause = e) 89 } 90 } 91