<lambda>null1 package kotlinx.coroutines.reactive
2
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.channels.*
6 import kotlinx.coroutines.flow.*
7 import kotlinx.coroutines.flow.internal.*
8 import kotlinx.coroutines.intrinsics.*
9 import org.reactivestreams.*
10 import java.util.*
11 import kotlin.coroutines.*
12 import kotlinx.coroutines.internal.*
13
14 /**
15 * Transforms the given reactive [Publisher] into [Flow].
16 * Use the [buffer] operator on the resulting flow to specify the size of the back-pressure.
17 * In effect, it specifies the value of the subscription's [request][Subscription.request].
18 * The [default buffer capacity][Channel.BUFFERED] for a suspending channel is used by default.
19 *
20 * If any of the resulting flow transformations fails, the subscription is immediately cancelled and all the in-flight
21 * elements are discarded.
22 *
23 * This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module,
24 * see its documentation for additional details.
25 */
26 public fun <T : Any> Publisher<T>.asFlow(): Flow<T> =
27 PublisherAsFlow(this)
28
29 /**
30 * Transforms the given flow into a reactive specification compliant [Publisher].
31 *
32 * This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module,
33 * see its documentation for additional details.
34 *
35 * An optional [context] can be specified to control the execution context of calls to the [Subscriber] methods.
36 * A [CoroutineDispatcher] can be set to confine them to a specific thread; various [ThreadContextElement] can be set to
37 * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
38 * is used, so calls are performed from an arbitrary thread.
39 */
40 @JvmOverloads // binary compatibility
41 public fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T> =
42 FlowAsPublisher(this, Dispatchers.Unconfined + context)
43
44 private class PublisherAsFlow<T : Any>(
45 private val publisher: Publisher<T>,
46 context: CoroutineContext = EmptyCoroutineContext,
47 capacity: Int = Channel.BUFFERED,
48 onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
49 ) : ChannelFlow<T>(context, capacity, onBufferOverflow) {
50 override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
51 PublisherAsFlow(publisher, context, capacity, onBufferOverflow)
52
53 /*
54 * The @Suppress is for Channel.CHANNEL_DEFAULT_CAPACITY.
55 * It's too counter-intuitive to be public, and moving it to Flow companion
56 * will also create undesired effect.
57 */
58 @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") // do not remove the INVISIBLE_REFERENCE suppression: required in K2
59 private val requestSize: Long
60 get() =
61 if (onBufferOverflow != BufferOverflow.SUSPEND) {
62 Long.MAX_VALUE // request all, since buffering strategy is to never suspend
63 } else when (capacity) {
64 Channel.RENDEZVOUS -> 1L // need to request at least one anyway
65 Channel.UNLIMITED -> Long.MAX_VALUE // reactive streams way to say "give all", must be Long.MAX_VALUE
66 Channel.BUFFERED -> Channel.CHANNEL_DEFAULT_CAPACITY.toLong()
67 else -> capacity.toLong().also { check(it >= 1) }
68 }
69
70 override suspend fun collect(collector: FlowCollector<T>) {
71 val collectContext = coroutineContext
72 val newDispatcher = context[ContinuationInterceptor]
73 if (newDispatcher == null || newDispatcher == collectContext[ContinuationInterceptor]) {
74 // fast path -- subscribe directly in this dispatcher
75 return collectImpl(collectContext + context, collector)
76 }
77 // slow path -- produce in a separate dispatcher
78 collectSlowPath(collector)
79 }
80
81 private suspend fun collectSlowPath(collector: FlowCollector<T>) {
82 coroutineScope {
83 collector.emitAll(produceImpl(this + context))
84 }
85 }
86
87 private suspend fun collectImpl(injectContext: CoroutineContext, collector: FlowCollector<T>) {
88 val subscriber = ReactiveSubscriber<T>(capacity, onBufferOverflow, requestSize)
89 // inject subscribe context into publisher
90 publisher.injectCoroutineContext(injectContext).subscribe(subscriber)
91 try {
92 var consumed = 0L
93 while (true) {
94 val value = subscriber.takeNextOrNull() ?: break
95 coroutineContext.ensureActive()
96 collector.emit(value)
97 if (++consumed == requestSize) {
98 consumed = 0L
99 subscriber.makeRequest()
100 }
101 }
102 } finally {
103 subscriber.cancel()
104 }
105 }
106
107 // The second channel here is used for produceIn/broadcastIn and slow-path (dispatcher change)
108 override suspend fun collectTo(scope: ProducerScope<T>) =
109 collectImpl(scope.coroutineContext, SendingCollector(scope.channel))
110 }
111
112 @Suppress("ReactiveStreamsSubscriberImplementation")
113 private class ReactiveSubscriber<T : Any>(
114 capacity: Int,
115 onBufferOverflow: BufferOverflow,
116 private val requestSize: Long
117 ) : Subscriber<T> {
118 private lateinit var subscription: Subscription
119
120 // This implementation of ReactiveSubscriber always uses "offer" in its onNext implementation and it cannot
121 // be reliable with rendezvous channel, so a rendezvous channel is replaced with buffer=1 channel
122 private val channel = Channel<T>(if (capacity == Channel.RENDEZVOUS) 1 else capacity, onBufferOverflow)
123
takeNextOrNullnull124 suspend fun takeNextOrNull(): T? {
125 val result = channel.receiveCatching()
126 result.exceptionOrNull()?.let { throw it }
127 return result.getOrElse { null } // Closed channel
128 }
129
onNextnull130 override fun onNext(value: T) {
131 // Controlled by requestSize
132 require(channel.trySend(value).isSuccess) { "Element $value was not added to channel because it was full, $channel" }
133 }
134
onCompletenull135 override fun onComplete() {
136 channel.close()
137 }
138
onErrornull139 override fun onError(t: Throwable?) {
140 channel.close(t)
141 }
142
onSubscribenull143 override fun onSubscribe(s: Subscription) {
144 subscription = s
145 makeRequest()
146 }
147
makeRequestnull148 fun makeRequest() {
149 subscription.request(requestSize)
150 }
151
cancelnull152 fun cancel() {
153 subscription.cancel()
154 }
155 }
156
157 // ContextInjector service is implemented in `kotlinx-coroutines-reactor` module only.
158 // If `kotlinx-coroutines-reactor` module is not included, the list is empty.
159 private val contextInjectors: Array<ContextInjector> =
160 ServiceLoader.load(ContextInjector::class.java, ContextInjector::class.java.classLoader)
161 .iterator().asSequence()
162 .toList().toTypedArray() // R8 opto
163
injectCoroutineContextnull164 internal fun <T> Publisher<T>.injectCoroutineContext(coroutineContext: CoroutineContext) =
165 contextInjectors.fold(this) { pub, contextInjector -> contextInjector.injectCoroutineContext(pub, coroutineContext) }
166
167 /**
168 * Adapter that transforms [Flow] into TCK-complaint [Publisher].
169 * [cancel] invocation cancels the original flow.
170 */
171 @Suppress("ReactiveStreamsPublisherImplementation")
172 private class FlowAsPublisher<T : Any>(
173 private val flow: Flow<T>,
174 private val context: CoroutineContext
175 ) : Publisher<T> {
subscribenull176 override fun subscribe(subscriber: Subscriber<in T>?) {
177 if (subscriber == null) throw NullPointerException()
178 subscriber.onSubscribe(FlowSubscription(flow, subscriber, context))
179 }
180 }
181
182 /** @suppress */
183 @InternalCoroutinesApi
184 public class FlowSubscription<T>(
185 @JvmField public val flow: Flow<T>,
186 @JvmField public val subscriber: Subscriber<in T>,
187 context: CoroutineContext
188 ) : Subscription, AbstractCoroutine<Unit>(context, initParentJob = false, true) {
189 /*
190 * We deliberately set initParentJob to false and do not establish parent-child
191 * relationship because FlowSubscription doesn't support it
192 */
193 private val requested = atomic(0L)
194 private val producer = atomic<Continuation<Unit>?>(createInitialContinuation())
195 @Volatile
196 private var cancellationRequested = false
197
198 // This code wraps startCoroutineCancellable into continuation
<lambda>null199 private fun createInitialContinuation(): Continuation<Unit> = Continuation(coroutineContext) {
200 ::flowProcessing.startCoroutineCancellable(this)
201 }
202
flowProcessingnull203 private suspend fun flowProcessing() {
204 try {
205 consumeFlow()
206 } catch (cause: Throwable) {
207 @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") // do not remove the INVISIBLE_REFERENCE suppression: required in K2
208 val unwrappedCause = unwrap(cause)
209 if (!cancellationRequested || isActive || unwrappedCause !== getCancellationException()) {
210 try {
211 subscriber.onError(cause)
212 } catch (e: Throwable) {
213 // Last ditch report
214 cause.addSuppressed(e)
215 handleCoroutineException(coroutineContext, cause)
216 }
217 }
218 return
219 }
220 // We only call this if `consumeFlow()` finished successfully
221 try {
222 subscriber.onComplete()
223 } catch (e: Throwable) {
224 handleCoroutineException(coroutineContext, e)
225 }
226 }
227
228 /*
229 * This method has at most one caller at any time (triggered from the `request` method)
230 */
consumeFlownull231 private suspend fun consumeFlow() {
232 flow.collect { value ->
233 // Emit the value
234 subscriber.onNext(value)
235 // Suspend if needed before requesting the next value
236 if (requested.decrementAndGet() <= 0) {
237 suspendCancellableCoroutine<Unit> {
238 producer.value = it
239 }
240 } else {
241 // check for cancellation if we don't suspend
242 coroutineContext.ensureActive()
243 }
244 }
245 }
246
cancelnull247 override fun cancel() {
248 cancellationRequested = true
249 cancel(null)
250 }
251
requestnull252 override fun request(n: Long) {
253 if (n <= 0) return
254 val old = requested.getAndUpdate { value ->
255 val newValue = value + n
256 if (newValue <= 0L) Long.MAX_VALUE else newValue
257 }
258 if (old <= 0L) {
259 assert(old == 0L)
260 // Emitter is not started yet or has suspended -- spin on race with suspendCancellableCoroutine
261 while (true) {
262 val producer = producer.getAndSet(null) ?: continue // spin if not set yet
263 producer.resume(Unit)
264 break
265 }
266 }
267 }
268 }
269