• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.flow.internal
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.channels.*
9 import kotlinx.coroutines.flow.*
10 import kotlinx.coroutines.internal.*
11 import kotlin.coroutines.*
12 import kotlin.coroutines.intrinsics.*
13 import kotlin.jvm.*
14 
15 internal fun <T> Flow<T>.asChannelFlow(): ChannelFlow<T> =
16     this as? ChannelFlow ?: ChannelFlowOperatorImpl(this)
17 
18 /**
19  * Operators that can fuse with **downstream** [buffer] and [flowOn] operators implement this interface.
20  *
21  * @suppress **This an internal API and should not be used from general code.**
22  */
23 @InternalCoroutinesApi
24 public interface FusibleFlow<T> : Flow<T> {
25     /**
26      * This function is called by [flowOn] (with context) and [buffer] (with capacity) operators
27      * that are applied to this flow. Should not be used with [capacity] of [Channel.CONFLATED]
28      * (it shall be desugared to `capacity = 0, onBufferOverflow = DROP_OLDEST`).
29      */
30     public fun fuse(
31         context: CoroutineContext = EmptyCoroutineContext,
32         capacity: Int = Channel.OPTIONAL_CHANNEL,
33         onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
34     ): Flow<T>
35 }
36 
37 /**
38  * Operators that use channels as their "output" extend this `ChannelFlow` and are always fused with each other.
39  * This class servers as a skeleton implementation of [FusibleFlow] and provides other cross-cutting
40  * methods like ability to [produceIn] and [broadcastIn] the corresponding flow, thus making it
41  * possible to directly use the backing channel if it exists (hence the `ChannelFlow` name).
42  *
43  * @suppress **This an internal API and should not be used from general code.**
44  */
45 @InternalCoroutinesApi
46 public abstract class ChannelFlow<T>(
47     // upstream context
48     @JvmField public val context: CoroutineContext,
49     // buffer capacity between upstream and downstream context
50     @JvmField public val capacity: Int,
51     // buffer overflow strategy
52     @JvmField public val onBufferOverflow: BufferOverflow
53 ) : FusibleFlow<T> {
54     init {
<lambda>null55         assert { capacity != Channel.CONFLATED } // CONFLATED must be desugared to 0, DROP_OLDEST by callers
56     }
57 
58     // shared code to create a suspend lambda from collectTo function in one place
59     internal val collectToFun: suspend (ProducerScope<T>) -> Unit
<lambda>null60         get() = { collectTo(it) }
61 
62     private val produceCapacity: Int
63         get() = if (capacity == Channel.OPTIONAL_CHANNEL) Channel.BUFFERED else capacity
64 
65     /**
66      * When this [ChannelFlow] implementation can work without a channel (supports [Channel.OPTIONAL_CHANNEL]),
67      * then it should return a non-null value from this function, so that a caller can use it without the effect of
68      * additional [flowOn] and [buffer] operators, by incorporating its
69      * [context], [capacity], and [onBufferOverflow] into its own implementation.
70      */
dropChannelOperatorsnull71     public open fun dropChannelOperators(): Flow<T>? = null
72 
73     public override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): Flow<T> {
74         assert { capacity != Channel.CONFLATED } // CONFLATED must be desugared to (0, DROP_OLDEST) by callers
75         // note: previous upstream context (specified before) takes precedence
76         val newContext = context + this.context
77         val newCapacity: Int
78         val newOverflow: BufferOverflow
79         if (onBufferOverflow != BufferOverflow.SUSPEND) {
80             // this additional buffer never suspends => overwrite preceding buffering configuration
81             newCapacity = capacity
82             newOverflow = onBufferOverflow
83         } else {
84             // combine capacities, keep previous overflow strategy
85             newCapacity = when {
86                 this.capacity == Channel.OPTIONAL_CHANNEL -> capacity
87                 capacity == Channel.OPTIONAL_CHANNEL -> this.capacity
88                 this.capacity == Channel.BUFFERED -> capacity
89                 capacity == Channel.BUFFERED -> this.capacity
90                 else -> {
91                     // sanity checks
92                     assert { this.capacity >= 0 }
93                     assert { capacity >= 0 }
94                     // combine capacities clamping to UNLIMITED on overflow
95                     val sum = this.capacity + capacity
96                     if (sum >= 0) sum else Channel.UNLIMITED // unlimited on int overflow
97                 }
98             }
99             newOverflow = this.onBufferOverflow
100         }
101         if (newContext == this.context && newCapacity == this.capacity && newOverflow == this.onBufferOverflow)
102             return this
103         return create(newContext, newCapacity, newOverflow)
104     }
105 
createnull106     protected abstract fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T>
107 
108     protected abstract suspend fun collectTo(scope: ProducerScope<T>)
109 
110     // broadcastImpl is used in broadcastIn operator which is obsolete and replaced by SharedFlow.
111     // BroadcastChannel does not support onBufferOverflow beyond simple conflation
112     public open fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel<T> {
113         val broadcastCapacity = when (onBufferOverflow) {
114             BufferOverflow.SUSPEND -> produceCapacity
115             BufferOverflow.DROP_OLDEST -> Channel.CONFLATED
116             BufferOverflow.DROP_LATEST ->
117                 throw IllegalArgumentException("Broadcast channel does not support BufferOverflow.DROP_LATEST")
118         }
119         return scope.broadcast(context, broadcastCapacity, start, block = collectToFun)
120     }
121 
122     /**
123      * Here we use ATOMIC start for a reason (#1825).
124      * NB: [produceImpl] is used for [flowOn].
125      * For non-atomic start it is possible to observe the situation,
126      * where the pipeline after the [flowOn] call successfully executes (mostly, its `onCompletion`)
127      * handlers, while the pipeline before does not, because it was cancelled during its dispatch.
128      * Thus `onCompletion` and `finally` blocks won't be executed and it may lead to a different kinds of memory leaks.
129      */
produceImplnull130     public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
131         scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)
132 
133     override suspend fun collect(collector: FlowCollector<T>): Unit =
134         coroutineScope {
135             collector.emitAll(produceImpl(this))
136         }
137 
additionalToStringPropsnull138     protected open fun additionalToStringProps(): String? = null
139 
140     // debug toString
141     override fun toString(): String {
142         val props = ArrayList<String>(4)
143         additionalToStringProps()?.let { props.add(it) }
144         if (context !== EmptyCoroutineContext) props.add("context=$context")
145         if (capacity != Channel.OPTIONAL_CHANNEL) props.add("capacity=$capacity")
146         if (onBufferOverflow != BufferOverflow.SUSPEND) props.add("onBufferOverflow=$onBufferOverflow")
147         return "$classSimpleName[${props.joinToString(", ")}]"
148     }
149 }
150 
151 // ChannelFlow implementation that operates on another flow before it
152 internal abstract class ChannelFlowOperator<S, T>(
153     @JvmField protected val flow: Flow<S>,
154     context: CoroutineContext,
155     capacity: Int,
156     onBufferOverflow: BufferOverflow
157 ) : ChannelFlow<T>(context, capacity, onBufferOverflow) {
flowCollectnull158     protected abstract suspend fun flowCollect(collector: FlowCollector<T>)
159 
160     // Changes collecting context upstream to the specified newContext, while collecting in the original context
161     private suspend fun collectWithContextUndispatched(collector: FlowCollector<T>, newContext: CoroutineContext) {
162         val originalContextCollector = collector.withUndispatchedContextCollector(coroutineContext)
163         // invoke flowCollect(originalContextCollector) in the newContext
164         return withContextUndispatched(newContext, block = { flowCollect(it) }, value = originalContextCollector)
165     }
166 
167     // Slow path when output channel is required
collectTonull168     protected override suspend fun collectTo(scope: ProducerScope<T>) =
169         flowCollect(SendingCollector(scope))
170 
171     // Optimizations for fast-path when channel creation is optional
172     override suspend fun collect(collector: FlowCollector<T>) {
173         // Fast-path: When channel creation is optional (flowOn/flowWith operators without buffer)
174         if (capacity == Channel.OPTIONAL_CHANNEL) {
175             val collectContext = coroutineContext
176             val newContext = collectContext + context // compute resulting collect context
177             // #1: If the resulting context happens to be the same as it was -- fallback to plain collect
178             if (newContext == collectContext)
179                 return flowCollect(collector)
180             // #2: If we don't need to change the dispatcher we can go without channels
181             if (newContext[ContinuationInterceptor] == collectContext[ContinuationInterceptor])
182                 return collectWithContextUndispatched(collector, newContext)
183         }
184         // Slow-path: create the actual channel
185         super.collect(collector)
186     }
187 
188     // debug toString
toStringnull189     override fun toString(): String = "$flow -> ${super.toString()}"
190 }
191 
192 /**
193  * Simple channel flow operator: [flowOn], [buffer], or their fused combination.
194  */
195 internal class ChannelFlowOperatorImpl<T>(
196     flow: Flow<T>,
197     context: CoroutineContext = EmptyCoroutineContext,
198     capacity: Int = Channel.OPTIONAL_CHANNEL,
199     onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
200 ) : ChannelFlowOperator<T, T>(flow, context, capacity, onBufferOverflow) {
201     override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
202         ChannelFlowOperatorImpl(flow, context, capacity, onBufferOverflow)
203 
204     override fun dropChannelOperators(): Flow<T>? = flow
205 
206     override suspend fun flowCollect(collector: FlowCollector<T>) =
207         flow.collect(collector)
208 }
209 
210 // Now if the underlying collector was accepting concurrent emits, then this one is too
211 // todo: we might need to generalize this pattern for "thread-safe" operators that can fuse with channels
withUndispatchedContextCollectornull212 private fun <T> FlowCollector<T>.withUndispatchedContextCollector(emitContext: CoroutineContext): FlowCollector<T> = when (this) {
213     // SendingCollector & NopCollector do not care about the context at all and can be used as is
214     is SendingCollector, is NopCollector -> this
215     // Otherwise just wrap into UndispatchedContextCollector interface implementation
216     else -> UndispatchedContextCollector(this, emitContext)
217 }
218 
219 private class UndispatchedContextCollector<T>(
220     downstream: FlowCollector<T>,
221     private val emitContext: CoroutineContext
222 ) : FlowCollector<T> {
223     private val countOrElement = threadContextElements(emitContext) // precompute for fast withContextUndispatched
<lambda>null224     private val emitRef: suspend (T) -> Unit = { downstream.emit(it) } // allocate suspend function ref once on creation
225 
emitnull226     override suspend fun emit(value: T): Unit =
227         withContextUndispatched(emitContext, value, countOrElement, emitRef)
228 }
229 
230 // Efficiently computes block(value) in the newContext
231 internal suspend fun <T, V> withContextUndispatched(
232     newContext: CoroutineContext,
233     value: V,
234     countOrElement: Any = threadContextElements(newContext), // can be precomputed for speed
235     block: suspend (V) -> T
236 ): T =
237     suspendCoroutineUninterceptedOrReturn { uCont ->
238         withCoroutineContext(newContext, countOrElement) {
239             block.startCoroutineUninterceptedOrReturn(value, StackFrameContinuation(uCont, newContext))
240         }
241     }
242 
243 // Continuation that links the caller with uCont with walkable CoroutineStackFrame
244 private class StackFrameContinuation<T>(
245     private val uCont: Continuation<T>, override val context: CoroutineContext
246 ) : Continuation<T>, CoroutineStackFrame {
247 
248     override val callerFrame: CoroutineStackFrame?
249         get() = uCont as? CoroutineStackFrame
250 
resumeWithnull251     override fun resumeWith(result: Result<T>) {
252         uCont.resumeWith(result)
253     }
254 
getStackTraceElementnull255     override fun getStackTraceElement(): StackTraceElement? = null
256 }
257