• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx2
6 
7 import io.reactivex.*
8 import io.reactivex.disposables.*
9 import kotlinx.atomicfu.*
10 import kotlinx.coroutines.*
11 import kotlinx.coroutines.channels.*
12 import kotlinx.coroutines.internal.*
13 
14 /**
15  * Subscribes to this [MaybeSource] and returns a channel to receive elements emitted by it.
16  * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this source.
17  *
18  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
19  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
20  */
21 @ObsoleteCoroutinesApi
22 @Suppress("CONFLICTING_OVERLOADS")
openSubscriptionnull23 public fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> {
24     val channel = SubscriptionChannel<T>()
25     subscribe(channel)
26     return channel
27 }
28 
29 /**
30  * Subscribes to this [ObservableSource] and returns a channel to receive elements emitted by it.
31  * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this source.
32  *
33  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
34  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
35  */
36 @ObsoleteCoroutinesApi
37 @Suppress("CONFLICTING_OVERLOADS")
openSubscriptionnull38 public fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> {
39     val channel = SubscriptionChannel<T>()
40     subscribe(channel)
41     return channel
42 }
43 
44 // Will be promoted to error in 1.3.0, removed in 1.4.0
45 @Deprecated(message = "Use collect instead", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.collect(action)"))
consumeEachnull46 public suspend inline fun <T> MaybeSource<T>.consumeEach(action: (T) -> Unit) =
47     openSubscription().consumeEach(action)
48 
49 // Will be promoted to error in 1.3.0, removed in 1.4.0
50 @Deprecated(message = "Use collect instead", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.collect(action)"))
51 public suspend inline fun <T> ObservableSource<T>.consumeEach(action: (T) -> Unit) =
52     openSubscription().consumeEach(action)
53 
54 /**
55  * Subscribes to this [MaybeSource] and performs the specified action for each received element.
56  * Cancels subscription if any exception happens during collect.
57  */
58 @ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
59 public suspend inline fun <T> MaybeSource<T>.collect(action: (T) -> Unit) =
60     openSubscription().consumeEach(action)
61 
62 /**
63  * Subscribes to this [ObservableSource] and performs the specified action for each received element.
64  * Cancels subscription if any exception happens during collect.
65  */
66 @ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
67 public suspend inline fun <T> ObservableSource<T>.collect(action: (T) -> Unit) =
68     openSubscription().consumeEach(action)
69 
70 @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
71 private class SubscriptionChannel<T> :
72     LinkedListChannel<T>(), Observer<T>, MaybeObserver<T>
73 {
74     private val _subscription = atomic<Disposable?>(null)
75 
76     @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
77     override fun onClosedIdempotent(closed: LockFreeLinkedListNode) {
78         _subscription.getAndSet(null)?.dispose() // dispose exactly once
79     }
80 
81     // Observer overrider
82     override fun onSubscribe(sub: Disposable) {
83         _subscription.value = sub
84     }
85 
86     override fun onSuccess(t: T) {
87         offer(t)
88     }
89 
90     override fun onNext(t: T) {
91         offer(t)
92     }
93 
94     override fun onComplete() {
95         close(cause = null)
96     }
97 
98     override fun onError(e: Throwable) {
99         close(cause = e)
100     }
101 }
102