• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactive
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.channels.*
10 import kotlinx.coroutines.flow.*
11 import kotlinx.coroutines.flow.internal.*
12 import kotlinx.coroutines.intrinsics.*
13 import org.reactivestreams.*
14 import java.util.*
15 import kotlin.coroutines.*
16 import kotlinx.coroutines.internal.*
17 
18 /**
19  * Transforms the given reactive [Publisher] into [Flow].
20  * Use the [buffer] operator on the resulting flow to specify the size of the back-pressure.
21  * In effect, it specifies the value of the subscription's [request][Subscription.request].
22  * The [default buffer capacity][Channel.BUFFERED] for a suspending channel is used by default.
23  *
24  * If any of the resulting flow transformations fails, the subscription is immediately cancelled and all the in-flight
25  * elements are discarded.
26  *
27  * This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module,
28  * see its documentation for additional details.
29  */
30 public fun <T : Any> Publisher<T>.asFlow(): Flow<T> =
31     PublisherAsFlow(this)
32 
33 /**
34  * Transforms the given flow into a reactive specification compliant [Publisher].
35  *
36  * This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module,
37  * see its documentation for additional details.
38  *
39  * An optional [context] can be specified to control the execution context of calls to the [Subscriber] methods.
40  * A [CoroutineDispatcher] can be set to confine them to a specific thread; various [ThreadContextElement] can be set to
41  * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
42  * is used, so calls are performed from an arbitrary thread.
43  */
44 @JvmOverloads // binary compatibility
45 public fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T> =
46     FlowAsPublisher(this, Dispatchers.Unconfined + context)
47 
48 private class PublisherAsFlow<T : Any>(
49     private val publisher: Publisher<T>,
50     context: CoroutineContext = EmptyCoroutineContext,
51     capacity: Int = Channel.BUFFERED,
52     onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
53 ) : ChannelFlow<T>(context, capacity, onBufferOverflow) {
54     override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
55         PublisherAsFlow(publisher, context, capacity, onBufferOverflow)
56 
57     /*
58      * The @Suppress is for Channel.CHANNEL_DEFAULT_CAPACITY.
59      * It's too counter-intuitive to be public, and moving it to Flow companion
60      * will also create undesired effect.
61      */
62     @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
63     private val requestSize: Long
64         get() =
65             if (onBufferOverflow != BufferOverflow.SUSPEND) {
66                 Long.MAX_VALUE // request all, since buffering strategy is to never suspend
67             } else when (capacity) {
68                 Channel.RENDEZVOUS -> 1L // need to request at least one anyway
69                 Channel.UNLIMITED -> Long.MAX_VALUE // reactive streams way to say "give all", must be Long.MAX_VALUE
70                 Channel.BUFFERED -> Channel.CHANNEL_DEFAULT_CAPACITY.toLong()
71                 else -> capacity.toLong().also { check(it >= 1) }
72             }
73 
74     override suspend fun collect(collector: FlowCollector<T>) {
75         val collectContext = coroutineContext
76         val newDispatcher = context[ContinuationInterceptor]
77         if (newDispatcher == null || newDispatcher == collectContext[ContinuationInterceptor]) {
78             // fast path -- subscribe directly in this dispatcher
79             return collectImpl(collectContext + context, collector)
80         }
81         // slow path -- produce in a separate dispatcher
82         collectSlowPath(collector)
83     }
84 
85     private suspend fun collectSlowPath(collector: FlowCollector<T>) {
86         coroutineScope {
87             collector.emitAll(produceImpl(this + context))
88         }
89     }
90 
91     private suspend fun collectImpl(injectContext: CoroutineContext, collector: FlowCollector<T>) {
92         val subscriber = ReactiveSubscriber<T>(capacity, onBufferOverflow, requestSize)
93         // inject subscribe context into publisher
94         publisher.injectCoroutineContext(injectContext).subscribe(subscriber)
95         try {
96             var consumed = 0L
97             while (true) {
98                 val value = subscriber.takeNextOrNull() ?: break
99                 coroutineContext.ensureActive()
100                 collector.emit(value)
101                 if (++consumed == requestSize) {
102                     consumed = 0L
103                     subscriber.makeRequest()
104                 }
105             }
106         } finally {
107             subscriber.cancel()
108         }
109     }
110 
111     // The second channel here is used for produceIn/broadcastIn and slow-path (dispatcher change)
112     override suspend fun collectTo(scope: ProducerScope<T>) =
113         collectImpl(scope.coroutineContext, SendingCollector(scope.channel))
114 }
115 
116 @Suppress("ReactiveStreamsSubscriberImplementation")
117 private class ReactiveSubscriber<T : Any>(
118     capacity: Int,
119     onBufferOverflow: BufferOverflow,
120     private val requestSize: Long
121 ) : Subscriber<T> {
122     private lateinit var subscription: Subscription
123 
124     // This implementation of ReactiveSubscriber always uses "offer" in its onNext implementation and it cannot
125     // be reliable with rendezvous channel, so a rendezvous channel is replaced with buffer=1 channel
126     private val channel = Channel<T>(if (capacity == Channel.RENDEZVOUS) 1 else capacity, onBufferOverflow)
127 
takeNextOrNullnull128     suspend fun takeNextOrNull(): T? {
129         val result = channel.receiveCatching()
130         result.exceptionOrNull()?.let { throw it }
131         return result.getOrElse { null } // Closed channel
132     }
133 
onNextnull134     override fun onNext(value: T) {
135         // Controlled by requestSize
136         require(channel.trySend(value).isSuccess) { "Element $value was not added to channel because it was full, $channel" }
137     }
138 
onCompletenull139     override fun onComplete() {
140         channel.close()
141     }
142 
onErrornull143     override fun onError(t: Throwable?) {
144         channel.close(t)
145     }
146 
onSubscribenull147     override fun onSubscribe(s: Subscription) {
148         subscription = s
149         makeRequest()
150     }
151 
makeRequestnull152     fun makeRequest() {
153         subscription.request(requestSize)
154     }
155 
cancelnull156     fun cancel() {
157         subscription.cancel()
158     }
159 }
160 
161 // ContextInjector service is implemented in `kotlinx-coroutines-reactor` module only.
162 // If `kotlinx-coroutines-reactor` module is not included, the list is empty.
163 private val contextInjectors: Array<ContextInjector> =
164     ServiceLoader.load(ContextInjector::class.java, ContextInjector::class.java.classLoader)
165         .iterator().asSequence()
166         .toList().toTypedArray() // R8 opto
167 
injectCoroutineContextnull168 internal fun <T> Publisher<T>.injectCoroutineContext(coroutineContext: CoroutineContext) =
169     contextInjectors.fold(this) { pub, contextInjector -> contextInjector.injectCoroutineContext(pub, coroutineContext) }
170 
171 /**
172  * Adapter that transforms [Flow] into TCK-complaint [Publisher].
173  * [cancel] invocation cancels the original flow.
174  */
175 @Suppress("ReactiveStreamsPublisherImplementation")
176 private class FlowAsPublisher<T : Any>(
177     private val flow: Flow<T>,
178     private val context: CoroutineContext
179 ) : Publisher<T> {
subscribenull180     override fun subscribe(subscriber: Subscriber<in T>?) {
181         if (subscriber == null) throw NullPointerException()
182         subscriber.onSubscribe(FlowSubscription(flow, subscriber, context))
183     }
184 }
185 
186 /** @suppress */
187 @InternalCoroutinesApi
188 public class FlowSubscription<T>(
189     @JvmField public val flow: Flow<T>,
190     @JvmField public val subscriber: Subscriber<in T>,
191     context: CoroutineContext
192 ) : Subscription, AbstractCoroutine<Unit>(context, initParentJob = false, true) {
193     /*
194      * We deliberately set initParentJob to false and do not establish parent-child
195      * relationship because FlowSubscription doesn't support it
196      */
197     private val requested = atomic(0L)
198     private val producer = atomic<Continuation<Unit>?>(createInitialContinuation())
199     @Volatile
200     private var cancellationRequested = false
201 
202     // This code wraps startCoroutineCancellable into continuation
<lambda>null203     private fun createInitialContinuation(): Continuation<Unit> = Continuation(coroutineContext) {
204         ::flowProcessing.startCoroutineCancellable(this)
205     }
206 
flowProcessingnull207     private suspend fun flowProcessing() {
208         try {
209             consumeFlow()
210         } catch (cause: Throwable) {
211             @Suppress("INVISIBLE_MEMBER")
212             val unwrappedCause = unwrap(cause)
213             if (!cancellationRequested || isActive || unwrappedCause !== getCancellationException()) {
214                 try {
215                     subscriber.onError(cause)
216                 } catch (e: Throwable) {
217                     // Last ditch report
218                     cause.addSuppressed(e)
219                     handleCoroutineException(coroutineContext, cause)
220                 }
221             }
222             return
223         }
224         // We only call this if `consumeFlow()` finished successfully
225         try {
226             subscriber.onComplete()
227         } catch (e: Throwable) {
228             handleCoroutineException(coroutineContext, e)
229         }
230     }
231 
232     /*
233      * This method has at most one caller at any time (triggered from the `request` method)
234      */
consumeFlownull235     private suspend fun consumeFlow() {
236         flow.collect { value ->
237             // Emit the value
238             subscriber.onNext(value)
239             // Suspend if needed before requesting the next value
240             if (requested.decrementAndGet() <= 0) {
241                 suspendCancellableCoroutine<Unit> {
242                     producer.value = it
243                 }
244             } else {
245                 // check for cancellation if we don't suspend
246                 coroutineContext.ensureActive()
247             }
248         }
249     }
250 
cancelnull251     override fun cancel() {
252         cancellationRequested = true
253         cancel(null)
254     }
255 
requestnull256     override fun request(n: Long) {
257         if (n <= 0) return
258         val old = requested.getAndUpdate { value ->
259             val newValue = value + n
260             if (newValue <= 0L) Long.MAX_VALUE else newValue
261         }
262         if (old <= 0L) {
263             assert(old == 0L)
264             // Emitter is not started yet or has suspended -- spin on race with suspendCancellableCoroutine
265             while (true) {
266                 val producer = producer.getAndSet(null) ?: continue // spin if not set yet
267                 producer.resume(Unit)
268                 break
269             }
270         }
271     }
272 }
273