<lambda>null1 package kotlinx.coroutines.reactive
2
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.channels.*
5 import kotlinx.coroutines.flow.*
6 import org.reactivestreams.*
7
8 /**
9 * Subscribes to this [Publisher] and performs the specified action for each received element.
10 *
11 * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
12 * [collect]. Also, if the publisher signals an error, that error is rethrown from [collect].
13 */
14 public suspend inline fun <T> Publisher<T>.collect(action: (T) -> Unit): Unit =
15 toChannel().consumeEach(action)
16
17 @PublishedApi
18 internal fun <T> Publisher<T>.toChannel(request: Int = 1): ReceiveChannel<T> {
19 val channel = SubscriptionChannel<T>(request)
20 subscribe(channel)
21 return channel
22 }
23
24 @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER", "SubscriberImplementation")
25 private class SubscriptionChannel<T>(
26 private val request: Int
27 ) : BufferedChannel<T>(capacity = Channel.UNLIMITED), Subscriber<T> {
28 init {
<lambda>null29 require(request >= 0) { "Invalid request size: $request" }
30 }
31
32 private val _subscription = atomic<Subscription?>(null)
33
34 // requested from subscription minus number of received minus number of enqueued receivers,
35 // can be negative if we have receivers, but no subscription yet
36 private val _requested = atomic(0)
37
38 // --------------------- BufferedChannel overrides -------------------------------
39 @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
onReceiveEnqueuednull40 override fun onReceiveEnqueued() {
41 _requested.loop { wasRequested ->
42 val subscription = _subscription.value
43 val needRequested = wasRequested - 1
44 if (subscription != null && needRequested < 0) { // need to request more from subscription
45 // try to fixup by making request
46 if (wasRequested != request && !_requested.compareAndSet(wasRequested, request))
47 return@loop // continue looping if failed
48 subscription.request((request - needRequested).toLong())
49 return
50 }
51 // just do book-keeping
52 if (_requested.compareAndSet(wasRequested, needRequested)) return
53 }
54 }
55
56 @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
onReceiveDequeuednull57 override fun onReceiveDequeued() {
58 _requested.incrementAndGet()
59 }
60
61 @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
onClosedIdempotentnull62 override fun onClosedIdempotent() {
63 _subscription.getAndSet(null)?.cancel() // cancel exactly once
64 }
65
66 // --------------------- Subscriber overrides -------------------------------
onSubscribenull67 override fun onSubscribe(s: Subscription) {
68 _subscription.value = s
69 while (true) { // lock-free loop on _requested
70 if (isClosedForSend) {
71 s.cancel()
72 return
73 }
74 val wasRequested = _requested.value
75 if (wasRequested >= request) return // ok -- normal story
76 // otherwise, receivers came before we had subscription or need to make initial request
77 // try to fixup by making request
78 if (!_requested.compareAndSet(wasRequested, request)) continue
79 s.request((request - wasRequested).toLong())
80 return
81 }
82 }
83
onNextnull84 override fun onNext(t: T) {
85 _requested.decrementAndGet()
86 trySend(t) // Safe to ignore return value here, expectedly racing with cancellation
87 }
88
onCompletenull89 override fun onComplete() {
90 close(cause = null)
91 }
92
onErrornull93 override fun onError(e: Throwable) {
94 close(cause = e)
95 }
96 }
97
98 /** @suppress */
99 @Deprecated(
100 message = "Transforming publisher to channel is deprecated, use asFlow() instead",
101 level = DeprecationLevel.HIDDEN) // ERROR in 1.4, HIDDEN in 1.6.0
openSubscriptionnull102 public fun <T> Publisher<T>.openSubscription(request: Int = 1): ReceiveChannel<T> {
103 val channel = SubscriptionChannel<T>(request)
104 subscribe(channel)
105 return channel
106 }
107