• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.reactive
2 
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.channels.*
5 import kotlinx.coroutines.flow.*
6 import org.reactivestreams.*
7 
8 /**
9  * Subscribes to this [Publisher] and performs the specified action for each received element.
10  *
11  * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
12  * [collect]. Also, if the publisher signals an error, that error is rethrown from [collect].
13  */
14 public suspend inline fun <T> Publisher<T>.collect(action: (T) -> Unit): Unit =
15     toChannel().consumeEach(action)
16 
17 @PublishedApi
18 internal fun <T> Publisher<T>.toChannel(request: Int = 1): ReceiveChannel<T> {
19     val channel = SubscriptionChannel<T>(request)
20     subscribe(channel)
21     return channel
22 }
23 
24 @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER", "SubscriberImplementation")
25 private class SubscriptionChannel<T>(
26     private val request: Int
27 ) : BufferedChannel<T>(capacity = Channel.UNLIMITED), Subscriber<T> {
28     init {
<lambda>null29         require(request >= 0) { "Invalid request size: $request" }
30     }
31 
32     private val _subscription = atomic<Subscription?>(null)
33 
34     // requested from subscription minus number of received minus number of enqueued receivers,
35     // can be negative if we have receivers, but no subscription yet
36     private val _requested = atomic(0)
37 
38     // --------------------- BufferedChannel overrides -------------------------------
39     @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
onReceiveEnqueuednull40     override fun onReceiveEnqueued() {
41         _requested.loop { wasRequested ->
42             val subscription = _subscription.value
43             val needRequested = wasRequested - 1
44             if (subscription != null && needRequested < 0) { // need to request more from subscription
45                 // try to fixup by making request
46                 if (wasRequested != request && !_requested.compareAndSet(wasRequested, request))
47                     return@loop // continue looping if failed
48                 subscription.request((request - needRequested).toLong())
49                 return
50             }
51             // just do book-keeping
52             if (_requested.compareAndSet(wasRequested, needRequested)) return
53         }
54     }
55 
56     @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
onReceiveDequeuednull57     override fun onReceiveDequeued() {
58         _requested.incrementAndGet()
59     }
60 
61     @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
onClosedIdempotentnull62     override fun onClosedIdempotent() {
63         _subscription.getAndSet(null)?.cancel() // cancel exactly once
64     }
65 
66     // --------------------- Subscriber overrides -------------------------------
onSubscribenull67     override fun onSubscribe(s: Subscription) {
68         _subscription.value = s
69         while (true) { // lock-free loop on _requested
70             if (isClosedForSend) {
71                 s.cancel()
72                 return
73             }
74             val wasRequested = _requested.value
75             if (wasRequested >= request) return // ok -- normal story
76             // otherwise, receivers came before we had subscription or need to make initial request
77             // try to fixup by making request
78             if (!_requested.compareAndSet(wasRequested, request)) continue
79             s.request((request - wasRequested).toLong())
80             return
81         }
82     }
83 
onNextnull84     override fun onNext(t: T) {
85         _requested.decrementAndGet()
86         trySend(t) // Safe to ignore return value here, expectedly racing with cancellation
87     }
88 
onCompletenull89     override fun onComplete() {
90         close(cause = null)
91     }
92 
onErrornull93     override fun onError(e: Throwable) {
94         close(cause = e)
95     }
96 }
97 
98 /** @suppress */
99 @Deprecated(
100     message = "Transforming publisher to channel is deprecated, use asFlow() instead",
101     level = DeprecationLevel.HIDDEN) // ERROR in 1.4, HIDDEN in 1.6.0
openSubscriptionnull102 public fun <T> Publisher<T>.openSubscription(request: Int = 1): ReceiveChannel<T> {
103     val channel = SubscriptionChannel<T>(request)
104     subscribe(channel)
105     return channel
106 }
107