• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx3
6 
7 import io.reactivex.rxjava3.core.*
8 import io.reactivex.rxjava3.disposables.*
9 import kotlinx.atomicfu.*
10 import kotlinx.coroutines.channels.*
11 import kotlinx.coroutines.flow.*
12 
13 /**
14  * Subscribes to this [MaybeSource] and returns a channel to receive elements emitted by it.
15  * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this source.
16  *
17  * This API is internal in the favour of [Flow].
18  * [MaybeSource] doesn't have a corresponding [Flow] adapter, so it should be transformed to [Observable] first.
19  */
20 @PublishedApi
21 internal fun <T> MaybeSource<T & Any>.openSubscription(): ReceiveChannel<T> {
22     val channel = SubscriptionChannel<T>()
23     subscribe(channel)
24     return channel
25 }
26 
27 /**
28  * Subscribes to this [ObservableSource] and returns a channel to receive elements emitted by it.
29  * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this source.
30  *
31  * This API is internal in the favour of [Flow].
32  * [ObservableSource] doesn't have a corresponding [Flow] adapter, so it should be transformed to [Observable] first.
33  */
34 @PublishedApi
35 internal fun <T> ObservableSource<T & Any>.openSubscription(): ReceiveChannel<T> {
36     val channel = SubscriptionChannel<T>()
37     subscribe(channel)
38     return channel
39 }
40 
41 /**
42  * Subscribes to this [MaybeSource] and performs the specified action for each received element.
43  *
44  * If [action] throws an exception at some point or if the [MaybeSource] raises an error, the exception is rethrown from
45  * [collect].
46  */
47 public suspend inline fun <T> MaybeSource<T & Any>.collect(action: (T) -> Unit): Unit =
48     openSubscription().consumeEach(action)
49 
50 /**
51  * Subscribes to this [ObservableSource] and performs the specified action for each received element.
52  *
53  * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
54  * [collect]. Also, if the [ObservableSource] signals an error, that error is rethrown from [collect].
55  */
56 public suspend inline fun <T> ObservableSource<T & Any>.collect(action: (T) -> Unit): Unit = openSubscription().consumeEach(action)
57 
58 @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
59 private class SubscriptionChannel<T> :
60     BufferedChannel<T>(capacity = Channel.UNLIMITED), Observer<T & Any>, MaybeObserver<T & Any>
61 {
62     private val _subscription = atomic<Disposable?>(null)
63 
64     @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
onClosedIdempotentnull65     override fun onClosedIdempotent() {
66         _subscription.getAndSet(null)?.dispose() // dispose exactly once
67     }
68 
69     // Observer overrider
onSubscribenull70     override fun onSubscribe(sub: Disposable) {
71         _subscription.value = sub
72     }
73 
74     override fun onSuccess(t: T & Any) {
75         trySend(t)
76         close(cause = null)
77     }
78 
79     override fun onNext(t: T & Any) {
80         trySend(t) // Safe to ignore return value here, expectedly racing with cancellation
81     }
82 
onCompletenull83     override fun onComplete() {
84         close(cause = null)
85     }
86 
onErrornull87     override fun onError(e: Throwable) {
88         close(cause = e)
89     }
90 }
91