1 /*
2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.rx2
6
7 import io.reactivex.*
8 import io.reactivex.disposables.*
9 import kotlinx.atomicfu.*
10 import kotlinx.coroutines.channels.*
11 import kotlinx.coroutines.flow.*
12 import kotlinx.coroutines.reactive.*
13
14 /**
15 * Subscribes to this [MaybeSource] and performs the specified action for each received element.
16 *
17 * If [action] throws an exception at some point or if the [MaybeSource] raises an error, the exception is rethrown from
18 * [collect].
19 */
collectnull20 public suspend inline fun <T> MaybeSource<T>.collect(action: (T) -> Unit): Unit =
21 toChannel().consumeEach(action)
22
23 /**
24 * Subscribes to this [ObservableSource] and performs the specified action for each received element.
25 *
26 * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
27 * [collect]. Also, if the [ObservableSource] signals an error, that error is rethrown from [collect].
28 */
29 public suspend inline fun <T> ObservableSource<T>.collect(action: (T) -> Unit): Unit =
30 toChannel().consumeEach(action)
31
32 @PublishedApi
33 internal fun <T> MaybeSource<T>.toChannel(): ReceiveChannel<T> {
34 val channel = SubscriptionChannel<T>()
35 subscribe(channel)
36 return channel
37 }
38
39 @PublishedApi
toChannelnull40 internal fun <T> ObservableSource<T>.toChannel(): ReceiveChannel<T> {
41 val channel = SubscriptionChannel<T>()
42 subscribe(channel)
43 return channel
44 }
45
46 @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
47 private class SubscriptionChannel<T> :
48 BufferedChannel<T>(capacity = Channel.UNLIMITED), Observer<T>, MaybeObserver<T>
49 {
50 private val _subscription = atomic<Disposable?>(null)
51
52 @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
onClosedIdempotentnull53 override fun onClosedIdempotent() {
54 _subscription.getAndSet(null)?.dispose() // dispose exactly once
55 }
56
57 // Observer overrider
onSubscribenull58 override fun onSubscribe(sub: Disposable) {
59 _subscription.value = sub
60 }
61
62 override fun onSuccess(t: T & Any) {
63 trySend(t)
64 close(cause = null)
65 }
66
67 override fun onNext(t: T & Any) {
68 trySend(t) // Safe to ignore return value here, expectedly racing with cancellation
69 }
70
onCompletenull71 override fun onComplete() {
72 close(cause = null)
73 }
74
onErrornull75 override fun onError(e: Throwable) {
76 close(cause = e)
77 }
78 }
79
80 /** @suppress */
81 @Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.HIDDEN) // ERROR in 1.4.0, HIDDEN in 1.6.0
82 public fun <T> ObservableSource<T & Any>.openSubscription(): ReceiveChannel<T> {
83 val channel = SubscriptionChannel<T>()
84 subscribe(channel)
85 return channel
86 }
87
88 /** @suppress */
89 @Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.HIDDEN) // ERROR in 1.4.0, HIDDEN in 1.6.0
90 public fun <T> MaybeSource<T & Any>.openSubscription(): ReceiveChannel<T> {
91 val channel = SubscriptionChannel<T>()
92 subscribe(channel)
93 return channel
94 }
95