1 /*
2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.rx2
6
7 import io.reactivex.*
8 import io.reactivex.disposables.*
9 import kotlinx.atomicfu.*
10 import kotlinx.coroutines.*
11 import kotlinx.coroutines.channels.*
12 import kotlinx.coroutines.internal.*
13
14 /**
15 * Subscribes to this [MaybeSource] and returns a channel to receive elements emitted by it.
16 * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this source.
17 *
18 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
19 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
20 */
21 @ObsoleteCoroutinesApi
22 @Suppress("CONFLICTING_OVERLOADS")
openSubscriptionnull23 public fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> {
24 val channel = SubscriptionChannel<T>()
25 subscribe(channel)
26 return channel
27 }
28
29 /**
30 * Subscribes to this [ObservableSource] and returns a channel to receive elements emitted by it.
31 * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this source.
32 *
33 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
34 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
35 */
36 @ObsoleteCoroutinesApi
37 @Suppress("CONFLICTING_OVERLOADS")
openSubscriptionnull38 public fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> {
39 val channel = SubscriptionChannel<T>()
40 subscribe(channel)
41 return channel
42 }
43
44 // Will be promoted to error in 1.3.0, removed in 1.4.0
45 @Deprecated(message = "Use collect instead", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.collect(action)"))
consumeEachnull46 public suspend inline fun <T> MaybeSource<T>.consumeEach(action: (T) -> Unit) =
47 openSubscription().consumeEach(action)
48
49 // Will be promoted to error in 1.3.0, removed in 1.4.0
50 @Deprecated(message = "Use collect instead", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.collect(action)"))
51 public suspend inline fun <T> ObservableSource<T>.consumeEach(action: (T) -> Unit) =
52 openSubscription().consumeEach(action)
53
54 /**
55 * Subscribes to this [MaybeSource] and performs the specified action for each received element.
56 * Cancels subscription if any exception happens during collect.
57 */
58 @ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
59 public suspend inline fun <T> MaybeSource<T>.collect(action: (T) -> Unit) =
60 openSubscription().consumeEach(action)
61
62 /**
63 * Subscribes to this [ObservableSource] and performs the specified action for each received element.
64 * Cancels subscription if any exception happens during collect.
65 */
66 @ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
67 public suspend inline fun <T> ObservableSource<T>.collect(action: (T) -> Unit) =
68 openSubscription().consumeEach(action)
69
70 @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
71 private class SubscriptionChannel<T> :
72 LinkedListChannel<T>(), Observer<T>, MaybeObserver<T>
73 {
74 private val _subscription = atomic<Disposable?>(null)
75
76 @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
77 override fun onClosedIdempotent(closed: LockFreeLinkedListNode) {
78 _subscription.getAndSet(null)?.dispose() // dispose exactly once
79 }
80
81 // Observer overrider
82 override fun onSubscribe(sub: Disposable) {
83 _subscription.value = sub
84 }
85
86 override fun onSuccess(t: T) {
87 offer(t)
88 }
89
90 override fun onNext(t: T) {
91 offer(t)
92 }
93
94 override fun onComplete() {
95 close(cause = null)
96 }
97
98 override fun onError(e: Throwable) {
99 close(cause = e)
100 }
101 }
102