1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.reactive
6
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.channels.*
9 import kotlinx.coroutines.flow.*
10 import org.reactivestreams.*
11
12 /**
13 * Subscribes to this [Publisher] and performs the specified action for each received element.
14 *
15 * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
16 * [collect]. Also, if the publisher signals an error, that error is rethrown from [collect].
17 */
18 public suspend inline fun <T> Publisher<T>.collect(action: (T) -> Unit): Unit =
19 toChannel().consumeEach(action)
20
21 @PublishedApi
22 internal fun <T> Publisher<T>.toChannel(request: Int = 1): ReceiveChannel<T> {
23 val channel = SubscriptionChannel<T>(request)
24 subscribe(channel)
25 return channel
26 }
27
28 @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER", "SubscriberImplementation")
29 private class SubscriptionChannel<T>(
30 private val request: Int
31 ) : BufferedChannel<T>(capacity = Channel.UNLIMITED), Subscriber<T> {
32 init {
<lambda>null33 require(request >= 0) { "Invalid request size: $request" }
34 }
35
36 private val _subscription = atomic<Subscription?>(null)
37
38 // requested from subscription minus number of received minus number of enqueued receivers,
39 // can be negative if we have receivers, but no subscription yet
40 private val _requested = atomic(0)
41
42 // --------------------- BufferedChannel overrides -------------------------------
43 @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
onReceiveEnqueuednull44 override fun onReceiveEnqueued() {
45 _requested.loop { wasRequested ->
46 val subscription = _subscription.value
47 val needRequested = wasRequested - 1
48 if (subscription != null && needRequested < 0) { // need to request more from subscription
49 // try to fixup by making request
50 if (wasRequested != request && !_requested.compareAndSet(wasRequested, request))
51 return@loop // continue looping if failed
52 subscription.request((request - needRequested).toLong())
53 return
54 }
55 // just do book-keeping
56 if (_requested.compareAndSet(wasRequested, needRequested)) return
57 }
58 }
59
60 @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
onReceiveDequeuednull61 override fun onReceiveDequeued() {
62 _requested.incrementAndGet()
63 }
64
65 @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
onClosedIdempotentnull66 override fun onClosedIdempotent() {
67 _subscription.getAndSet(null)?.cancel() // cancel exactly once
68 }
69
70 // --------------------- Subscriber overrides -------------------------------
onSubscribenull71 override fun onSubscribe(s: Subscription) {
72 _subscription.value = s
73 while (true) { // lock-free loop on _requested
74 if (isClosedForSend) {
75 s.cancel()
76 return
77 }
78 val wasRequested = _requested.value
79 if (wasRequested >= request) return // ok -- normal story
80 // otherwise, receivers came before we had subscription or need to make initial request
81 // try to fixup by making request
82 if (!_requested.compareAndSet(wasRequested, request)) continue
83 s.request((request - wasRequested).toLong())
84 return
85 }
86 }
87
onNextnull88 override fun onNext(t: T) {
89 _requested.decrementAndGet()
90 trySend(t) // Safe to ignore return value here, expectedly racing with cancellation
91 }
92
onCompletenull93 override fun onComplete() {
94 close(cause = null)
95 }
96
onErrornull97 override fun onError(e: Throwable) {
98 close(cause = e)
99 }
100 }
101
102 /** @suppress */
103 @Deprecated(
104 message = "Transforming publisher to channel is deprecated, use asFlow() instead",
105 level = DeprecationLevel.HIDDEN) // ERROR in 1.4, HIDDEN in 1.6.0
openSubscriptionnull106 public fun <T> Publisher<T>.openSubscription(request: Int = 1): ReceiveChannel<T> {
107 val channel = SubscriptionChannel<T>(request)
108 subscribe(channel)
109 return channel
110 }
111