1 /*
<lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.flow.internal
6
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.channels.*
9 import kotlinx.coroutines.flow.*
10 import kotlinx.coroutines.internal.*
11 import kotlin.coroutines.*
12 import kotlin.coroutines.intrinsics.*
13 import kotlin.jvm.*
14
15 internal fun <T> Flow<T>.asChannelFlow(): ChannelFlow<T> =
16 this as? ChannelFlow ?: ChannelFlowOperatorImpl(this)
17
18 /**
19 * Operators that can fuse with **downstream** [buffer] and [flowOn] operators implement this interface.
20 *
21 * @suppress **This an internal API and should not be used from general code.**
22 */
23 @InternalCoroutinesApi
24 public interface FusibleFlow<T> : Flow<T> {
25 /**
26 * This function is called by [flowOn] (with context) and [buffer] (with capacity) operators
27 * that are applied to this flow. Should not be used with [capacity] of [Channel.CONFLATED]
28 * (it shall be desugared to `capacity = 0, onBufferOverflow = DROP_OLDEST`).
29 */
30 public fun fuse(
31 context: CoroutineContext = EmptyCoroutineContext,
32 capacity: Int = Channel.OPTIONAL_CHANNEL,
33 onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
34 ): Flow<T>
35 }
36
37 /**
38 * Operators that use channels as their "output" extend this `ChannelFlow` and are always fused with each other.
39 * This class servers as a skeleton implementation of [FusibleFlow] and provides other cross-cutting
40 * methods like ability to [produceIn] and [broadcastIn] the corresponding flow, thus making it
41 * possible to directly use the backing channel if it exists (hence the `ChannelFlow` name).
42 *
43 * @suppress **This an internal API and should not be used from general code.**
44 */
45 @InternalCoroutinesApi
46 public abstract class ChannelFlow<T>(
47 // upstream context
48 @JvmField public val context: CoroutineContext,
49 // buffer capacity between upstream and downstream context
50 @JvmField public val capacity: Int,
51 // buffer overflow strategy
52 @JvmField public val onBufferOverflow: BufferOverflow
53 ) : FusibleFlow<T> {
54 init {
<lambda>null55 assert { capacity != Channel.CONFLATED } // CONFLATED must be desugared to 0, DROP_OLDEST by callers
56 }
57
58 // shared code to create a suspend lambda from collectTo function in one place
59 internal val collectToFun: suspend (ProducerScope<T>) -> Unit
<lambda>null60 get() = { collectTo(it) }
61
62 private val produceCapacity: Int
63 get() = if (capacity == Channel.OPTIONAL_CHANNEL) Channel.BUFFERED else capacity
64
65 /**
66 * When this [ChannelFlow] implementation can work without a channel (supports [Channel.OPTIONAL_CHANNEL]),
67 * then it should return a non-null value from this function, so that a caller can use it without the effect of
68 * additional [flowOn] and [buffer] operators, by incorporating its
69 * [context], [capacity], and [onBufferOverflow] into its own implementation.
70 */
dropChannelOperatorsnull71 public open fun dropChannelOperators(): Flow<T>? = null
72
73 public override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): Flow<T> {
74 assert { capacity != Channel.CONFLATED } // CONFLATED must be desugared to (0, DROP_OLDEST) by callers
75 // note: previous upstream context (specified before) takes precedence
76 val newContext = context + this.context
77 val newCapacity: Int
78 val newOverflow: BufferOverflow
79 if (onBufferOverflow != BufferOverflow.SUSPEND) {
80 // this additional buffer never suspends => overwrite preceding buffering configuration
81 newCapacity = capacity
82 newOverflow = onBufferOverflow
83 } else {
84 // combine capacities, keep previous overflow strategy
85 newCapacity = when {
86 this.capacity == Channel.OPTIONAL_CHANNEL -> capacity
87 capacity == Channel.OPTIONAL_CHANNEL -> this.capacity
88 this.capacity == Channel.BUFFERED -> capacity
89 capacity == Channel.BUFFERED -> this.capacity
90 else -> {
91 // sanity checks
92 assert { this.capacity >= 0 }
93 assert { capacity >= 0 }
94 // combine capacities clamping to UNLIMITED on overflow
95 val sum = this.capacity + capacity
96 if (sum >= 0) sum else Channel.UNLIMITED // unlimited on int overflow
97 }
98 }
99 newOverflow = this.onBufferOverflow
100 }
101 if (newContext == this.context && newCapacity == this.capacity && newOverflow == this.onBufferOverflow)
102 return this
103 return create(newContext, newCapacity, newOverflow)
104 }
105
createnull106 protected abstract fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T>
107
108 protected abstract suspend fun collectTo(scope: ProducerScope<T>)
109
110 // broadcastImpl is used in broadcastIn operator which is obsolete and replaced by SharedFlow.
111 // BroadcastChannel does not support onBufferOverflow beyond simple conflation
112 public open fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel<T> {
113 val broadcastCapacity = when (onBufferOverflow) {
114 BufferOverflow.SUSPEND -> produceCapacity
115 BufferOverflow.DROP_OLDEST -> Channel.CONFLATED
116 BufferOverflow.DROP_LATEST ->
117 throw IllegalArgumentException("Broadcast channel does not support BufferOverflow.DROP_LATEST")
118 }
119 return scope.broadcast(context, broadcastCapacity, start, block = collectToFun)
120 }
121
122 /**
123 * Here we use ATOMIC start for a reason (#1825).
124 * NB: [produceImpl] is used for [flowOn].
125 * For non-atomic start it is possible to observe the situation,
126 * where the pipeline after the [flowOn] call successfully executes (mostly, its `onCompletion`)
127 * handlers, while the pipeline before does not, because it was cancelled during its dispatch.
128 * Thus `onCompletion` and `finally` blocks won't be executed and it may lead to a different kinds of memory leaks.
129 */
produceImplnull130 public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
131 scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)
132
133 override suspend fun collect(collector: FlowCollector<T>): Unit =
134 coroutineScope {
135 collector.emitAll(produceImpl(this))
136 }
137
additionalToStringPropsnull138 protected open fun additionalToStringProps(): String? = null
139
140 // debug toString
141 override fun toString(): String {
142 val props = ArrayList<String>(4)
143 additionalToStringProps()?.let { props.add(it) }
144 if (context !== EmptyCoroutineContext) props.add("context=$context")
145 if (capacity != Channel.OPTIONAL_CHANNEL) props.add("capacity=$capacity")
146 if (onBufferOverflow != BufferOverflow.SUSPEND) props.add("onBufferOverflow=$onBufferOverflow")
147 return "$classSimpleName[${props.joinToString(", ")}]"
148 }
149 }
150
151 // ChannelFlow implementation that operates on another flow before it
152 internal abstract class ChannelFlowOperator<S, T>(
153 @JvmField protected val flow: Flow<S>,
154 context: CoroutineContext,
155 capacity: Int,
156 onBufferOverflow: BufferOverflow
157 ) : ChannelFlow<T>(context, capacity, onBufferOverflow) {
flowCollectnull158 protected abstract suspend fun flowCollect(collector: FlowCollector<T>)
159
160 // Changes collecting context upstream to the specified newContext, while collecting in the original context
161 private suspend fun collectWithContextUndispatched(collector: FlowCollector<T>, newContext: CoroutineContext) {
162 val originalContextCollector = collector.withUndispatchedContextCollector(coroutineContext)
163 // invoke flowCollect(originalContextCollector) in the newContext
164 return withContextUndispatched(newContext, block = { flowCollect(it) }, value = originalContextCollector)
165 }
166
167 // Slow path when output channel is required
collectTonull168 protected override suspend fun collectTo(scope: ProducerScope<T>) =
169 flowCollect(SendingCollector(scope))
170
171 // Optimizations for fast-path when channel creation is optional
172 override suspend fun collect(collector: FlowCollector<T>) {
173 // Fast-path: When channel creation is optional (flowOn/flowWith operators without buffer)
174 if (capacity == Channel.OPTIONAL_CHANNEL) {
175 val collectContext = coroutineContext
176 val newContext = collectContext + context // compute resulting collect context
177 // #1: If the resulting context happens to be the same as it was -- fallback to plain collect
178 if (newContext == collectContext)
179 return flowCollect(collector)
180 // #2: If we don't need to change the dispatcher we can go without channels
181 if (newContext[ContinuationInterceptor] == collectContext[ContinuationInterceptor])
182 return collectWithContextUndispatched(collector, newContext)
183 }
184 // Slow-path: create the actual channel
185 super.collect(collector)
186 }
187
188 // debug toString
toStringnull189 override fun toString(): String = "$flow -> ${super.toString()}"
190 }
191
192 /**
193 * Simple channel flow operator: [flowOn], [buffer], or their fused combination.
194 */
195 internal class ChannelFlowOperatorImpl<T>(
196 flow: Flow<T>,
197 context: CoroutineContext = EmptyCoroutineContext,
198 capacity: Int = Channel.OPTIONAL_CHANNEL,
199 onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
200 ) : ChannelFlowOperator<T, T>(flow, context, capacity, onBufferOverflow) {
201 override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
202 ChannelFlowOperatorImpl(flow, context, capacity, onBufferOverflow)
203
204 override fun dropChannelOperators(): Flow<T>? = flow
205
206 override suspend fun flowCollect(collector: FlowCollector<T>) =
207 flow.collect(collector)
208 }
209
210 // Now if the underlying collector was accepting concurrent emits, then this one is too
211 // todo: we might need to generalize this pattern for "thread-safe" operators that can fuse with channels
withUndispatchedContextCollectornull212 private fun <T> FlowCollector<T>.withUndispatchedContextCollector(emitContext: CoroutineContext): FlowCollector<T> = when (this) {
213 // SendingCollector & NopCollector do not care about the context at all and can be used as is
214 is SendingCollector, is NopCollector -> this
215 // Otherwise just wrap into UndispatchedContextCollector interface implementation
216 else -> UndispatchedContextCollector(this, emitContext)
217 }
218
219 private class UndispatchedContextCollector<T>(
220 downstream: FlowCollector<T>,
221 private val emitContext: CoroutineContext
222 ) : FlowCollector<T> {
223 private val countOrElement = threadContextElements(emitContext) // precompute for fast withContextUndispatched
<lambda>null224 private val emitRef: suspend (T) -> Unit = { downstream.emit(it) } // allocate suspend function ref once on creation
225
emitnull226 override suspend fun emit(value: T): Unit =
227 withContextUndispatched(emitContext, value, countOrElement, emitRef)
228 }
229
230 // Efficiently computes block(value) in the newContext
231 internal suspend fun <T, V> withContextUndispatched(
232 newContext: CoroutineContext,
233 value: V,
234 countOrElement: Any = threadContextElements(newContext), // can be precomputed for speed
235 block: suspend (V) -> T
236 ): T =
237 suspendCoroutineUninterceptedOrReturn { uCont ->
238 withCoroutineContext(newContext, countOrElement) {
239 block.startCoroutineUninterceptedOrReturn(value, StackFrameContinuation(uCont, newContext))
240 }
241 }
242
243 // Continuation that links the caller with uCont with walkable CoroutineStackFrame
244 private class StackFrameContinuation<T>(
245 private val uCont: Continuation<T>, override val context: CoroutineContext
246 ) : Continuation<T>, CoroutineStackFrame {
247
248 override val callerFrame: CoroutineStackFrame?
249 get() = uCont as? CoroutineStackFrame
250
resumeWithnull251 override fun resumeWith(result: Result<T>) {
252 uCont.resumeWith(result)
253 }
254
getStackTraceElementnull255 override fun getStackTraceElement(): StackTraceElement? = null
256 }
257