1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.reactive
6
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.channels.*
9 import kotlinx.coroutines.flow.*
10 import kotlinx.coroutines.internal.*
11 import org.reactivestreams.*
12
13 /**
14 * Subscribes to this [Publisher] and performs the specified action for each received element.
15 *
16 * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
17 * [collect]. Also, if the publisher signals an error, that error is rethrown from [collect].
18 */
19 public suspend inline fun <T> Publisher<T>.collect(action: (T) -> Unit): Unit =
20 toChannel().consumeEach(action)
21
22 @PublishedApi
23 internal fun <T> Publisher<T>.toChannel(request: Int = 1): ReceiveChannel<T> {
24 val channel = SubscriptionChannel<T>(request)
25 subscribe(channel)
26 return channel
27 }
28
29 @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER", "SubscriberImplementation")
30 private class SubscriptionChannel<T>(
31 private val request: Int
32 ) : LinkedListChannel<T>(null), Subscriber<T> {
33 init {
<lambda>null34 require(request >= 0) { "Invalid request size: $request" }
35 }
36
37 private val _subscription = atomic<Subscription?>(null)
38
39 // requested from subscription minus number of received minus number of enqueued receivers,
40 // can be negative if we have receivers, but no subscription yet
41 private val _requested = atomic(0)
42
43 // --------------------- AbstractChannel overrides -------------------------------
44 @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
onReceiveEnqueuednull45 override fun onReceiveEnqueued() {
46 _requested.loop { wasRequested ->
47 val subscription = _subscription.value
48 val needRequested = wasRequested - 1
49 if (subscription != null && needRequested < 0) { // need to request more from subscription
50 // try to fixup by making request
51 if (wasRequested != request && !_requested.compareAndSet(wasRequested, request))
52 return@loop // continue looping if failed
53 subscription.request((request - needRequested).toLong())
54 return
55 }
56 // just do book-keeping
57 if (_requested.compareAndSet(wasRequested, needRequested)) return
58 }
59 }
60
61 @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
onReceiveDequeuednull62 override fun onReceiveDequeued() {
63 _requested.incrementAndGet()
64 }
65
66 @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
onClosedIdempotentnull67 override fun onClosedIdempotent(closed: LockFreeLinkedListNode) {
68 _subscription.getAndSet(null)?.cancel() // cancel exactly once
69 }
70
71 // --------------------- Subscriber overrides -------------------------------
onSubscribenull72 override fun onSubscribe(s: Subscription) {
73 _subscription.value = s
74 while (true) { // lock-free loop on _requested
75 if (isClosedForSend) {
76 s.cancel()
77 return
78 }
79 val wasRequested = _requested.value
80 if (wasRequested >= request) return // ok -- normal story
81 // otherwise, receivers came before we had subscription or need to make initial request
82 // try to fixup by making request
83 if (!_requested.compareAndSet(wasRequested, request)) continue
84 s.request((request - wasRequested).toLong())
85 return
86 }
87 }
88
onNextnull89 override fun onNext(t: T) {
90 _requested.decrementAndGet()
91 trySend(t) // Safe to ignore return value here, expectedly racing with cancellation
92 }
93
onCompletenull94 override fun onComplete() {
95 close(cause = null)
96 }
97
onErrornull98 override fun onError(e: Throwable) {
99 close(cause = e)
100 }
101 }
102
103 /** @suppress */
104 @Deprecated(
105 message = "Transforming publisher to channel is deprecated, use asFlow() instead",
106 level = DeprecationLevel.HIDDEN) // ERROR in 1.4, HIDDEN in 1.6.0
openSubscriptionnull107 public fun <T> Publisher<T>.openSubscription(request: Int = 1): ReceiveChannel<T> {
108 val channel = SubscriptionChannel<T>(request)
109 subscribe(channel)
110 return channel
111 }
112