• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactive
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.channels.*
9 import kotlinx.coroutines.flow.*
10 import org.reactivestreams.*
11 
12 /**
13  * Subscribes to this [Publisher] and performs the specified action for each received element.
14  *
15  * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
16  * [collect]. Also, if the publisher signals an error, that error is rethrown from [collect].
17  */
18 public suspend inline fun <T> Publisher<T>.collect(action: (T) -> Unit): Unit =
19     toChannel().consumeEach(action)
20 
21 @PublishedApi
22 internal fun <T> Publisher<T>.toChannel(request: Int = 1): ReceiveChannel<T> {
23     val channel = SubscriptionChannel<T>(request)
24     subscribe(channel)
25     return channel
26 }
27 
28 @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER", "SubscriberImplementation")
29 private class SubscriptionChannel<T>(
30     private val request: Int
31 ) : BufferedChannel<T>(capacity = Channel.UNLIMITED), Subscriber<T> {
32     init {
<lambda>null33         require(request >= 0) { "Invalid request size: $request" }
34     }
35 
36     private val _subscription = atomic<Subscription?>(null)
37 
38     // requested from subscription minus number of received minus number of enqueued receivers,
39     // can be negative if we have receivers, but no subscription yet
40     private val _requested = atomic(0)
41 
42     // --------------------- BufferedChannel overrides -------------------------------
43     @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
onReceiveEnqueuednull44     override fun onReceiveEnqueued() {
45         _requested.loop { wasRequested ->
46             val subscription = _subscription.value
47             val needRequested = wasRequested - 1
48             if (subscription != null && needRequested < 0) { // need to request more from subscription
49                 // try to fixup by making request
50                 if (wasRequested != request && !_requested.compareAndSet(wasRequested, request))
51                     return@loop // continue looping if failed
52                 subscription.request((request - needRequested).toLong())
53                 return
54             }
55             // just do book-keeping
56             if (_requested.compareAndSet(wasRequested, needRequested)) return
57         }
58     }
59 
60     @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
onReceiveDequeuednull61     override fun onReceiveDequeued() {
62         _requested.incrementAndGet()
63     }
64 
65     @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
onClosedIdempotentnull66     override fun onClosedIdempotent() {
67         _subscription.getAndSet(null)?.cancel() // cancel exactly once
68     }
69 
70     // --------------------- Subscriber overrides -------------------------------
onSubscribenull71     override fun onSubscribe(s: Subscription) {
72         _subscription.value = s
73         while (true) { // lock-free loop on _requested
74             if (isClosedForSend) {
75                 s.cancel()
76                 return
77             }
78             val wasRequested = _requested.value
79             if (wasRequested >= request) return // ok -- normal story
80             // otherwise, receivers came before we had subscription or need to make initial request
81             // try to fixup by making request
82             if (!_requested.compareAndSet(wasRequested, request)) continue
83             s.request((request - wasRequested).toLong())
84             return
85         }
86     }
87 
onNextnull88     override fun onNext(t: T) {
89         _requested.decrementAndGet()
90         trySend(t) // Safe to ignore return value here, expectedly racing with cancellation
91     }
92 
onCompletenull93     override fun onComplete() {
94         close(cause = null)
95     }
96 
onErrornull97     override fun onError(e: Throwable) {
98         close(cause = e)
99     }
100 }
101 
102 /** @suppress */
103 @Deprecated(
104     message = "Transforming publisher to channel is deprecated, use asFlow() instead",
105     level = DeprecationLevel.HIDDEN) // ERROR in 1.4, HIDDEN in 1.6.0
openSubscriptionnull106 public fun <T> Publisher<T>.openSubscription(request: Int = 1): ReceiveChannel<T> {
107     val channel = SubscriptionChannel<T>(request)
108     subscribe(channel)
109     return channel
110 }
111