1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.reactive
6
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.channels.*
10 import kotlinx.coroutines.flow.*
11 import kotlinx.coroutines.flow.internal.*
12 import kotlinx.coroutines.intrinsics.*
13 import org.reactivestreams.*
14 import java.util.*
15 import kotlin.coroutines.*
16 import kotlinx.coroutines.internal.*
17
18 /**
19 * Transforms the given reactive [Publisher] into [Flow].
20 * Use the [buffer] operator on the resulting flow to specify the size of the back-pressure.
21 * In effect, it specifies the value of the subscription's [request][Subscription.request].
22 * The [default buffer capacity][Channel.BUFFERED] for a suspending channel is used by default.
23 *
24 * If any of the resulting flow transformations fails, the subscription is immediately cancelled and all the in-flight
25 * elements are discarded.
26 *
27 * This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module,
28 * see its documentation for additional details.
29 */
30 public fun <T : Any> Publisher<T>.asFlow(): Flow<T> =
31 PublisherAsFlow(this)
32
33 /**
34 * Transforms the given flow into a reactive specification compliant [Publisher].
35 *
36 * This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module,
37 * see its documentation for additional details.
38 *
39 * An optional [context] can be specified to control the execution context of calls to the [Subscriber] methods.
40 * A [CoroutineDispatcher] can be set to confine them to a specific thread; various [ThreadContextElement] can be set to
41 * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
42 * is used, so calls are performed from an arbitrary thread.
43 */
44 @JvmOverloads // binary compatibility
45 public fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T> =
46 FlowAsPublisher(this, Dispatchers.Unconfined + context)
47
48 private class PublisherAsFlow<T : Any>(
49 private val publisher: Publisher<T>,
50 context: CoroutineContext = EmptyCoroutineContext,
51 capacity: Int = Channel.BUFFERED,
52 onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
53 ) : ChannelFlow<T>(context, capacity, onBufferOverflow) {
54 override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
55 PublisherAsFlow(publisher, context, capacity, onBufferOverflow)
56
57 /*
58 * The @Suppress is for Channel.CHANNEL_DEFAULT_CAPACITY.
59 * It's too counter-intuitive to be public, and moving it to Flow companion
60 * will also create undesired effect.
61 */
62 @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
63 private val requestSize: Long
64 get() =
65 if (onBufferOverflow != BufferOverflow.SUSPEND) {
66 Long.MAX_VALUE // request all, since buffering strategy is to never suspend
67 } else when (capacity) {
68 Channel.RENDEZVOUS -> 1L // need to request at least one anyway
69 Channel.UNLIMITED -> Long.MAX_VALUE // reactive streams way to say "give all", must be Long.MAX_VALUE
70 Channel.BUFFERED -> Channel.CHANNEL_DEFAULT_CAPACITY.toLong()
71 else -> capacity.toLong().also { check(it >= 1) }
72 }
73
74 override suspend fun collect(collector: FlowCollector<T>) {
75 val collectContext = coroutineContext
76 val newDispatcher = context[ContinuationInterceptor]
77 if (newDispatcher == null || newDispatcher == collectContext[ContinuationInterceptor]) {
78 // fast path -- subscribe directly in this dispatcher
79 return collectImpl(collectContext + context, collector)
80 }
81 // slow path -- produce in a separate dispatcher
82 collectSlowPath(collector)
83 }
84
85 private suspend fun collectSlowPath(collector: FlowCollector<T>) {
86 coroutineScope {
87 collector.emitAll(produceImpl(this + context))
88 }
89 }
90
91 private suspend fun collectImpl(injectContext: CoroutineContext, collector: FlowCollector<T>) {
92 val subscriber = ReactiveSubscriber<T>(capacity, onBufferOverflow, requestSize)
93 // inject subscribe context into publisher
94 publisher.injectCoroutineContext(injectContext).subscribe(subscriber)
95 try {
96 var consumed = 0L
97 while (true) {
98 val value = subscriber.takeNextOrNull() ?: break
99 coroutineContext.ensureActive()
100 collector.emit(value)
101 if (++consumed == requestSize) {
102 consumed = 0L
103 subscriber.makeRequest()
104 }
105 }
106 } finally {
107 subscriber.cancel()
108 }
109 }
110
111 // The second channel here is used for produceIn/broadcastIn and slow-path (dispatcher change)
112 override suspend fun collectTo(scope: ProducerScope<T>) =
113 collectImpl(scope.coroutineContext, SendingCollector(scope.channel))
114 }
115
116 @Suppress("ReactiveStreamsSubscriberImplementation")
117 private class ReactiveSubscriber<T : Any>(
118 capacity: Int,
119 onBufferOverflow: BufferOverflow,
120 private val requestSize: Long
121 ) : Subscriber<T> {
122 private lateinit var subscription: Subscription
123
124 // This implementation of ReactiveSubscriber always uses "offer" in its onNext implementation and it cannot
125 // be reliable with rendezvous channel, so a rendezvous channel is replaced with buffer=1 channel
126 private val channel = Channel<T>(if (capacity == Channel.RENDEZVOUS) 1 else capacity, onBufferOverflow)
127
takeNextOrNullnull128 suspend fun takeNextOrNull(): T? {
129 val result = channel.receiveCatching()
130 result.exceptionOrNull()?.let { throw it }
131 return result.getOrElse { null } // Closed channel
132 }
133
onNextnull134 override fun onNext(value: T) {
135 // Controlled by requestSize
136 require(channel.trySend(value).isSuccess) { "Element $value was not added to channel because it was full, $channel" }
137 }
138
onCompletenull139 override fun onComplete() {
140 channel.close()
141 }
142
onErrornull143 override fun onError(t: Throwable?) {
144 channel.close(t)
145 }
146
onSubscribenull147 override fun onSubscribe(s: Subscription) {
148 subscription = s
149 makeRequest()
150 }
151
makeRequestnull152 fun makeRequest() {
153 subscription.request(requestSize)
154 }
155
cancelnull156 fun cancel() {
157 subscription.cancel()
158 }
159 }
160
161 // ContextInjector service is implemented in `kotlinx-coroutines-reactor` module only.
162 // If `kotlinx-coroutines-reactor` module is not included, the list is empty.
163 private val contextInjectors: Array<ContextInjector> =
164 ServiceLoader.load(ContextInjector::class.java, ContextInjector::class.java.classLoader)
165 .iterator().asSequence()
166 .toList().toTypedArray() // R8 opto
167
injectCoroutineContextnull168 internal fun <T> Publisher<T>.injectCoroutineContext(coroutineContext: CoroutineContext) =
169 contextInjectors.fold(this) { pub, contextInjector -> contextInjector.injectCoroutineContext(pub, coroutineContext) }
170
171 /**
172 * Adapter that transforms [Flow] into TCK-complaint [Publisher].
173 * [cancel] invocation cancels the original flow.
174 */
175 @Suppress("ReactiveStreamsPublisherImplementation")
176 private class FlowAsPublisher<T : Any>(
177 private val flow: Flow<T>,
178 private val context: CoroutineContext
179 ) : Publisher<T> {
subscribenull180 override fun subscribe(subscriber: Subscriber<in T>?) {
181 if (subscriber == null) throw NullPointerException()
182 subscriber.onSubscribe(FlowSubscription(flow, subscriber, context))
183 }
184 }
185
186 /** @suppress */
187 @InternalCoroutinesApi
188 public class FlowSubscription<T>(
189 @JvmField public val flow: Flow<T>,
190 @JvmField public val subscriber: Subscriber<in T>,
191 context: CoroutineContext
192 ) : Subscription, AbstractCoroutine<Unit>(context, initParentJob = false, true) {
193 /*
194 * We deliberately set initParentJob to false and do not establish parent-child
195 * relationship because FlowSubscription doesn't support it
196 */
197 private val requested = atomic(0L)
198 private val producer = atomic<Continuation<Unit>?>(createInitialContinuation())
199 @Volatile
200 private var cancellationRequested = false
201
202 // This code wraps startCoroutineCancellable into continuation
<lambda>null203 private fun createInitialContinuation(): Continuation<Unit> = Continuation(coroutineContext) {
204 ::flowProcessing.startCoroutineCancellable(this)
205 }
206
flowProcessingnull207 private suspend fun flowProcessing() {
208 try {
209 consumeFlow()
210 } catch (cause: Throwable) {
211 @Suppress("INVISIBLE_MEMBER")
212 val unwrappedCause = unwrap(cause)
213 if (!cancellationRequested || isActive || unwrappedCause !== getCancellationException()) {
214 try {
215 subscriber.onError(cause)
216 } catch (e: Throwable) {
217 // Last ditch report
218 cause.addSuppressed(e)
219 handleCoroutineException(coroutineContext, cause)
220 }
221 }
222 return
223 }
224 // We only call this if `consumeFlow()` finished successfully
225 try {
226 subscriber.onComplete()
227 } catch (e: Throwable) {
228 handleCoroutineException(coroutineContext, e)
229 }
230 }
231
232 /*
233 * This method has at most one caller at any time (triggered from the `request` method)
234 */
consumeFlownull235 private suspend fun consumeFlow() {
236 flow.collect { value ->
237 // Emit the value
238 subscriber.onNext(value)
239 // Suspend if needed before requesting the next value
240 if (requested.decrementAndGet() <= 0) {
241 suspendCancellableCoroutine<Unit> {
242 producer.value = it
243 }
244 } else {
245 // check for cancellation if we don't suspend
246 coroutineContext.ensureActive()
247 }
248 }
249 }
250
cancelnull251 override fun cancel() {
252 cancellationRequested = true
253 cancel(null)
254 }
255
requestnull256 override fun request(n: Long) {
257 if (n <= 0) return
258 val old = requested.getAndUpdate { value ->
259 val newValue = value + n
260 if (newValue <= 0L) Long.MAX_VALUE else newValue
261 }
262 if (old <= 0L) {
263 assert(old == 0L)
264 // Emitter is not started yet or has suspended -- spin on race with suspendCancellableCoroutine
265 while (true) {
266 val producer = producer.getAndSet(null) ?: continue // spin if not set yet
267 producer.resume(Unit)
268 break
269 }
270 }
271 }
272 }
273