• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx2
6 
7 import io.reactivex.*
8 import io.reactivex.disposables.*
9 import kotlinx.atomicfu.*
10 import kotlinx.coroutines.channels.*
11 import kotlinx.coroutines.flow.*
12 import kotlinx.coroutines.reactive.*
13 
14 /**
15  * Subscribes to this [MaybeSource] and performs the specified action for each received element.
16  *
17  * If [action] throws an exception at some point or if the [MaybeSource] raises an error, the exception is rethrown from
18  * [collect].
19  */
collectnull20 public suspend inline fun <T> MaybeSource<T>.collect(action: (T) -> Unit): Unit =
21     toChannel().consumeEach(action)
22 
23 /**
24  * Subscribes to this [ObservableSource] and performs the specified action for each received element.
25  *
26  * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
27  * [collect]. Also, if the [ObservableSource] signals an error, that error is rethrown from [collect].
28  */
29 public suspend inline fun <T> ObservableSource<T>.collect(action: (T) -> Unit): Unit =
30     toChannel().consumeEach(action)
31 
32 @PublishedApi
33 internal fun <T> MaybeSource<T>.toChannel(): ReceiveChannel<T> {
34     val channel = SubscriptionChannel<T>()
35     subscribe(channel)
36     return channel
37 }
38 
39 @PublishedApi
toChannelnull40 internal fun <T> ObservableSource<T>.toChannel(): ReceiveChannel<T> {
41     val channel = SubscriptionChannel<T>()
42     subscribe(channel)
43     return channel
44 }
45 
46 @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
47 private class SubscriptionChannel<T> :
48     BufferedChannel<T>(capacity = Channel.UNLIMITED), Observer<T>, MaybeObserver<T>
49 {
50     private val _subscription = atomic<Disposable?>(null)
51 
52     @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
onClosedIdempotentnull53     override fun onClosedIdempotent() {
54         _subscription.getAndSet(null)?.dispose() // dispose exactly once
55     }
56 
57     // Observer overrider
onSubscribenull58     override fun onSubscribe(sub: Disposable) {
59         _subscription.value = sub
60     }
61 
62     override fun onSuccess(t: T & Any) {
63         trySend(t)
64         close(cause = null)
65     }
66 
67     override fun onNext(t: T & Any) {
68         trySend(t) // Safe to ignore return value here, expectedly racing with cancellation
69     }
70 
onCompletenull71     override fun onComplete() {
72         close(cause = null)
73     }
74 
onErrornull75     override fun onError(e: Throwable) {
76         close(cause = e)
77     }
78 }
79 
80 /** @suppress */
81 @Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.HIDDEN) // ERROR in 1.4.0, HIDDEN in 1.6.0
82 public fun <T> ObservableSource<T & Any>.openSubscription(): ReceiveChannel<T> {
83     val channel = SubscriptionChannel<T>()
84     subscribe(channel)
85     return channel
86 }
87 
88 /** @suppress */
89 @Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.HIDDEN) // ERROR in 1.4.0, HIDDEN in 1.6.0
90 public fun <T> MaybeSource<T & Any>.openSubscription(): ReceiveChannel<T> {
91     val channel = SubscriptionChannel<T>()
92     subscribe(channel)
93     return channel
94 }
95