• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactive
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.channels.*
9 import kotlinx.coroutines.flow.*
10 import kotlinx.coroutines.internal.*
11 import org.reactivestreams.*
12 
13 /**
14  * Subscribes to this [Publisher] and performs the specified action for each received element.
15  *
16  * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
17  * [collect]. Also, if the publisher signals an error, that error is rethrown from [collect].
18  */
19 public suspend inline fun <T> Publisher<T>.collect(action: (T) -> Unit): Unit =
20     toChannel().consumeEach(action)
21 
22 @PublishedApi
23 internal fun <T> Publisher<T>.toChannel(request: Int = 1): ReceiveChannel<T> {
24     val channel = SubscriptionChannel<T>(request)
25     subscribe(channel)
26     return channel
27 }
28 
29 @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER", "SubscriberImplementation")
30 private class SubscriptionChannel<T>(
31     private val request: Int
32 ) : LinkedListChannel<T>(null), Subscriber<T> {
33     init {
<lambda>null34         require(request >= 0) { "Invalid request size: $request" }
35     }
36 
37     private val _subscription = atomic<Subscription?>(null)
38 
39     // requested from subscription minus number of received minus number of enqueued receivers,
40     // can be negative if we have receivers, but no subscription yet
41     private val _requested = atomic(0)
42 
43     // --------------------- AbstractChannel overrides -------------------------------
44     @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
onReceiveEnqueuednull45     override fun onReceiveEnqueued() {
46         _requested.loop { wasRequested ->
47             val subscription = _subscription.value
48             val needRequested = wasRequested - 1
49             if (subscription != null && needRequested < 0) { // need to request more from subscription
50                 // try to fixup by making request
51                 if (wasRequested != request && !_requested.compareAndSet(wasRequested, request))
52                     return@loop // continue looping if failed
53                 subscription.request((request - needRequested).toLong())
54                 return
55             }
56             // just do book-keeping
57             if (_requested.compareAndSet(wasRequested, needRequested)) return
58         }
59     }
60 
61     @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
onReceiveDequeuednull62     override fun onReceiveDequeued() {
63         _requested.incrementAndGet()
64     }
65 
66     @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
onClosedIdempotentnull67     override fun onClosedIdempotent(closed: LockFreeLinkedListNode) {
68         _subscription.getAndSet(null)?.cancel() // cancel exactly once
69     }
70 
71     // --------------------- Subscriber overrides -------------------------------
onSubscribenull72     override fun onSubscribe(s: Subscription) {
73         _subscription.value = s
74         while (true) { // lock-free loop on _requested
75             if (isClosedForSend) {
76                 s.cancel()
77                 return
78             }
79             val wasRequested = _requested.value
80             if (wasRequested >= request) return // ok -- normal story
81             // otherwise, receivers came before we had subscription or need to make initial request
82             // try to fixup by making request
83             if (!_requested.compareAndSet(wasRequested, request)) continue
84             s.request((request - wasRequested).toLong())
85             return
86         }
87     }
88 
onNextnull89     override fun onNext(t: T) {
90         _requested.decrementAndGet()
91         trySend(t) // Safe to ignore return value here, expectedly racing with cancellation
92     }
93 
onCompletenull94     override fun onComplete() {
95         close(cause = null)
96     }
97 
onErrornull98     override fun onError(e: Throwable) {
99         close(cause = e)
100     }
101 }
102 
103 /** @suppress */
104 @Deprecated(
105     message = "Transforming publisher to channel is deprecated, use asFlow() instead",
106     level = DeprecationLevel.HIDDEN) // ERROR in 1.4, HIDDEN in 1.6.0
openSubscriptionnull107 public fun <T> Publisher<T>.openSubscription(request: Int = 1): ReceiveChannel<T> {
108     val channel = SubscriptionChannel<T>(request)
109     subscribe(channel)
110     return channel
111 }
112