• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.reactive
2 
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.channels.*
6 import kotlinx.coroutines.flow.*
7 import kotlinx.coroutines.flow.internal.*
8 import kotlinx.coroutines.intrinsics.*
9 import org.reactivestreams.*
10 import java.util.*
11 import kotlin.coroutines.*
12 import kotlinx.coroutines.internal.*
13 
14 /**
15  * Transforms the given reactive [Publisher] into [Flow].
16  * Use the [buffer] operator on the resulting flow to specify the size of the back-pressure.
17  * In effect, it specifies the value of the subscription's [request][Subscription.request].
18  * The [default buffer capacity][Channel.BUFFERED] for a suspending channel is used by default.
19  *
20  * If any of the resulting flow transformations fails, the subscription is immediately cancelled and all the in-flight
21  * elements are discarded.
22  *
23  * This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module,
24  * see its documentation for additional details.
25  */
26 public fun <T : Any> Publisher<T>.asFlow(): Flow<T> =
27     PublisherAsFlow(this)
28 
29 /**
30  * Transforms the given flow into a reactive specification compliant [Publisher].
31  *
32  * This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module,
33  * see its documentation for additional details.
34  *
35  * An optional [context] can be specified to control the execution context of calls to the [Subscriber] methods.
36  * A [CoroutineDispatcher] can be set to confine them to a specific thread; various [ThreadContextElement] can be set to
37  * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
38  * is used, so calls are performed from an arbitrary thread.
39  */
40 @JvmOverloads // binary compatibility
41 public fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T> =
42     FlowAsPublisher(this, Dispatchers.Unconfined + context)
43 
44 private class PublisherAsFlow<T : Any>(
45     private val publisher: Publisher<T>,
46     context: CoroutineContext = EmptyCoroutineContext,
47     capacity: Int = Channel.BUFFERED,
48     onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
49 ) : ChannelFlow<T>(context, capacity, onBufferOverflow) {
50     override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
51         PublisherAsFlow(publisher, context, capacity, onBufferOverflow)
52 
53     /*
54      * The @Suppress is for Channel.CHANNEL_DEFAULT_CAPACITY.
55      * It's too counter-intuitive to be public, and moving it to Flow companion
56      * will also create undesired effect.
57      */
58     @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") // do not remove the INVISIBLE_REFERENCE suppression: required in K2
59     private val requestSize: Long
60         get() =
61             if (onBufferOverflow != BufferOverflow.SUSPEND) {
62                 Long.MAX_VALUE // request all, since buffering strategy is to never suspend
63             } else when (capacity) {
64                 Channel.RENDEZVOUS -> 1L // need to request at least one anyway
65                 Channel.UNLIMITED -> Long.MAX_VALUE // reactive streams way to say "give all", must be Long.MAX_VALUE
66                 Channel.BUFFERED -> Channel.CHANNEL_DEFAULT_CAPACITY.toLong()
67                 else -> capacity.toLong().also { check(it >= 1) }
68             }
69 
70     override suspend fun collect(collector: FlowCollector<T>) {
71         val collectContext = coroutineContext
72         val newDispatcher = context[ContinuationInterceptor]
73         if (newDispatcher == null || newDispatcher == collectContext[ContinuationInterceptor]) {
74             // fast path -- subscribe directly in this dispatcher
75             return collectImpl(collectContext + context, collector)
76         }
77         // slow path -- produce in a separate dispatcher
78         collectSlowPath(collector)
79     }
80 
81     private suspend fun collectSlowPath(collector: FlowCollector<T>) {
82         coroutineScope {
83             collector.emitAll(produceImpl(this + context))
84         }
85     }
86 
87     private suspend fun collectImpl(injectContext: CoroutineContext, collector: FlowCollector<T>) {
88         val subscriber = ReactiveSubscriber<T>(capacity, onBufferOverflow, requestSize)
89         // inject subscribe context into publisher
90         publisher.injectCoroutineContext(injectContext).subscribe(subscriber)
91         try {
92             var consumed = 0L
93             while (true) {
94                 val value = subscriber.takeNextOrNull() ?: break
95                 coroutineContext.ensureActive()
96                 collector.emit(value)
97                 if (++consumed == requestSize) {
98                     consumed = 0L
99                     subscriber.makeRequest()
100                 }
101             }
102         } finally {
103             subscriber.cancel()
104         }
105     }
106 
107     // The second channel here is used for produceIn/broadcastIn and slow-path (dispatcher change)
108     override suspend fun collectTo(scope: ProducerScope<T>) =
109         collectImpl(scope.coroutineContext, SendingCollector(scope.channel))
110 }
111 
112 @Suppress("ReactiveStreamsSubscriberImplementation")
113 private class ReactiveSubscriber<T : Any>(
114     capacity: Int,
115     onBufferOverflow: BufferOverflow,
116     private val requestSize: Long
117 ) : Subscriber<T> {
118     private lateinit var subscription: Subscription
119 
120     // This implementation of ReactiveSubscriber always uses "offer" in its onNext implementation and it cannot
121     // be reliable with rendezvous channel, so a rendezvous channel is replaced with buffer=1 channel
122     private val channel = Channel<T>(if (capacity == Channel.RENDEZVOUS) 1 else capacity, onBufferOverflow)
123 
takeNextOrNullnull124     suspend fun takeNextOrNull(): T? {
125         val result = channel.receiveCatching()
126         result.exceptionOrNull()?.let { throw it }
127         return result.getOrElse { null } // Closed channel
128     }
129 
onNextnull130     override fun onNext(value: T) {
131         // Controlled by requestSize
132         require(channel.trySend(value).isSuccess) { "Element $value was not added to channel because it was full, $channel" }
133     }
134 
onCompletenull135     override fun onComplete() {
136         channel.close()
137     }
138 
onErrornull139     override fun onError(t: Throwable?) {
140         channel.close(t)
141     }
142 
onSubscribenull143     override fun onSubscribe(s: Subscription) {
144         subscription = s
145         makeRequest()
146     }
147 
makeRequestnull148     fun makeRequest() {
149         subscription.request(requestSize)
150     }
151 
cancelnull152     fun cancel() {
153         subscription.cancel()
154     }
155 }
156 
157 // ContextInjector service is implemented in `kotlinx-coroutines-reactor` module only.
158 // If `kotlinx-coroutines-reactor` module is not included, the list is empty.
159 private val contextInjectors: Array<ContextInjector> =
160     ServiceLoader.load(ContextInjector::class.java, ContextInjector::class.java.classLoader)
161         .iterator().asSequence()
162         .toList().toTypedArray() // R8 opto
163 
injectCoroutineContextnull164 internal fun <T> Publisher<T>.injectCoroutineContext(coroutineContext: CoroutineContext) =
165     contextInjectors.fold(this) { pub, contextInjector -> contextInjector.injectCoroutineContext(pub, coroutineContext) }
166 
167 /**
168  * Adapter that transforms [Flow] into TCK-complaint [Publisher].
169  * [cancel] invocation cancels the original flow.
170  */
171 @Suppress("ReactiveStreamsPublisherImplementation")
172 private class FlowAsPublisher<T : Any>(
173     private val flow: Flow<T>,
174     private val context: CoroutineContext
175 ) : Publisher<T> {
subscribenull176     override fun subscribe(subscriber: Subscriber<in T>?) {
177         if (subscriber == null) throw NullPointerException()
178         subscriber.onSubscribe(FlowSubscription(flow, subscriber, context))
179     }
180 }
181 
182 /** @suppress */
183 @InternalCoroutinesApi
184 public class FlowSubscription<T>(
185     @JvmField public val flow: Flow<T>,
186     @JvmField public val subscriber: Subscriber<in T>,
187     context: CoroutineContext
188 ) : Subscription, AbstractCoroutine<Unit>(context, initParentJob = false, true) {
189     /*
190      * We deliberately set initParentJob to false and do not establish parent-child
191      * relationship because FlowSubscription doesn't support it
192      */
193     private val requested = atomic(0L)
194     private val producer = atomic<Continuation<Unit>?>(createInitialContinuation())
195     @Volatile
196     private var cancellationRequested = false
197 
198     // This code wraps startCoroutineCancellable into continuation
<lambda>null199     private fun createInitialContinuation(): Continuation<Unit> = Continuation(coroutineContext) {
200         ::flowProcessing.startCoroutineCancellable(this)
201     }
202 
flowProcessingnull203     private suspend fun flowProcessing() {
204         try {
205             consumeFlow()
206         } catch (cause: Throwable) {
207             @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") // do not remove the INVISIBLE_REFERENCE suppression: required in K2
208             val unwrappedCause = unwrap(cause)
209             if (!cancellationRequested || isActive || unwrappedCause !== getCancellationException()) {
210                 try {
211                     subscriber.onError(cause)
212                 } catch (e: Throwable) {
213                     // Last ditch report
214                     cause.addSuppressed(e)
215                     handleCoroutineException(coroutineContext, cause)
216                 }
217             }
218             return
219         }
220         // We only call this if `consumeFlow()` finished successfully
221         try {
222             subscriber.onComplete()
223         } catch (e: Throwable) {
224             handleCoroutineException(coroutineContext, e)
225         }
226     }
227 
228     /*
229      * This method has at most one caller at any time (triggered from the `request` method)
230      */
consumeFlownull231     private suspend fun consumeFlow() {
232         flow.collect { value ->
233             // Emit the value
234             subscriber.onNext(value)
235             // Suspend if needed before requesting the next value
236             if (requested.decrementAndGet() <= 0) {
237                 suspendCancellableCoroutine<Unit> {
238                     producer.value = it
239                 }
240             } else {
241                 // check for cancellation if we don't suspend
242                 coroutineContext.ensureActive()
243             }
244         }
245     }
246 
cancelnull247     override fun cancel() {
248         cancellationRequested = true
249         cancel(null)
250     }
251 
requestnull252     override fun request(n: Long) {
253         if (n <= 0) return
254         val old = requested.getAndUpdate { value ->
255             val newValue = value + n
256             if (newValue <= 0L) Long.MAX_VALUE else newValue
257         }
258         if (old <= 0L) {
259             assert(old == 0L)
260             // Emitter is not started yet or has suspended -- spin on race with suspendCancellableCoroutine
261             while (true) {
262                 val producer = producer.getAndSet(null) ?: continue // spin if not set yet
263                 producer.resume(Unit)
264                 break
265             }
266         }
267     }
268 }
269