/* * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.reactive import kotlinx.atomicfu.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.internal.* import org.reactivestreams.* /** * Subscribes to this [Publisher] and returns a channel to receive elements emitted by it. * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this publisher. * @param request how many items to request from publisher in advance (optional, one by default). * * This method is deprecated in the favor of [Flow]. * Instead of iterating over the resulting channel please use [collect][Flow.collect]: * ``` * asFlow().collect { value -> * // process value * } * ``` */ @Deprecated( message = "Transforming publisher to channel is deprecated, use asFlow() instead", level = DeprecationLevel.WARNING) // Will be error in 1.4 public fun Publisher.openSubscription(request: Int = 1): ReceiveChannel { val channel = SubscriptionChannel(request) subscribe(channel) return channel } // Will be promoted to error in 1.3.0, removed in 1.4.0 @Deprecated(message = "Use collect instead", level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("this.collect(action)")) public suspend inline fun Publisher.consumeEach(action: (T) -> Unit): Unit = openSubscription().consumeEach(action) /** * Subscribes to this [Publisher] and performs the specified action for each received element. * Cancels subscription if any exception happens during collect. */ public suspend inline fun Publisher.collect(action: (T) -> Unit): Unit = openSubscription().consumeEach(action) @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER", "SubscriberImplementation") private class SubscriptionChannel( private val request: Int ) : LinkedListChannel(null), Subscriber { init { require(request >= 0) { "Invalid request size: $request" } } private val _subscription = atomic(null) // requested from subscription minus number of received minus number of enqueued receivers, // can be negative if we have receivers, but no subscription yet private val _requested = atomic(0) // AbstractChannel overrides @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER") override fun onReceiveEnqueued() { _requested.loop { wasRequested -> val subscription = _subscription.value val needRequested = wasRequested - 1 if (subscription != null && needRequested < 0) { // need to request more from subscription // try to fixup by making request if (wasRequested != request && !_requested.compareAndSet(wasRequested, request)) return@loop // continue looping if failed subscription.request((request - needRequested).toLong()) return } // just do book-keeping if (_requested.compareAndSet(wasRequested, needRequested)) return } } @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER") override fun onReceiveDequeued() { _requested.incrementAndGet() } @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER") override fun onClosedIdempotent(closed: LockFreeLinkedListNode) { _subscription.getAndSet(null)?.cancel() // cancel exactly once } // Subscriber overrides override fun onSubscribe(s: Subscription) { _subscription.value = s while (true) { // lock-free loop on _requested if (isClosedForSend) { s.cancel() return } val wasRequested = _requested.value if (wasRequested >= request) return // ok -- normal story // otherwise, receivers came before we had subscription or need to make initial request // try to fixup by making request if (!_requested.compareAndSet(wasRequested, request)) continue s.request((request - wasRequested).toLong()) return } } override fun onNext(t: T) { _requested.decrementAndGet() offer(t) } override fun onComplete() { close(cause = null) } override fun onError(e: Throwable) { close(cause = e) } }