1 /*
<lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 @file:JvmMultifileClass
6 @file:JvmName("FlowKt")
7
8 package kotlinx.coroutines.flow
9
10 import kotlinx.coroutines.*
11 import kotlinx.coroutines.channels.*
12 import kotlinx.coroutines.flow.internal.*
13 import kotlin.coroutines.*
14 import kotlin.jvm.*
15
16 // -------------------------------- shareIn --------------------------------
17
18 /**
19 * Converts a _cold_ [Flow] into a _hot_ [SharedFlow] that is started in the given coroutine [scope],
20 * sharing emissions from a single running instance of the upstream flow with multiple downstream subscribers,
21 * and replaying a specified number of [replay] values to new subscribers. See the [SharedFlow] documentation
22 * for the general concepts of shared flows.
23 *
24 * The starting of the sharing coroutine is controlled by the [started] parameter. The following options
25 * are supported.
26 *
27 * * [Eagerly][SharingStarted.Eagerly] — the upstream flow is started even before the first subscriber appears. Note
28 * that in this case all values emitted by the upstream beyond the most recent values as specified by
29 * [replay] parameter **will be immediately discarded**.
30 * * [Lazily][SharingStarted.Lazily] — starts the upstream flow after the first subscriber appears, which guarantees
31 * that this first subscriber gets all the emitted values, while subsequent subscribers are only guaranteed to
32 * get the most recent [replay] values. The upstream flow continues to be active even when all subscribers
33 * disappear, but only the most recent [replay] values are cached without subscribers.
34 * * [WhileSubscribed()][SharingStarted.WhileSubscribed] — starts the upstream flow when the first subscriber
35 * appears, immediately stops when the last subscriber disappears, keeping the replay cache forever.
36 * It has additional optional configuration parameters as explained in its documentation.
37 * * A custom strategy can be supplied by implementing the [SharingStarted] interface.
38 *
39 * The `shareIn` operator is useful in situations when there is a _cold_ flow that is expensive to create and/or
40 * to maintain, but there are multiple subscribers that need to collect its values. For example, consider a
41 * flow of messages coming from a backend over the expensive network connection, taking a lot of
42 * time to establish. Conceptually, it might be implemented like this:
43 *
44 * ```
45 * val backendMessages: Flow<Message> = flow {
46 * connectToBackend() // takes a lot of time
47 * try {
48 * while (true) {
49 * emit(receiveMessageFromBackend())
50 * }
51 * } finally {
52 * disconnectFromBackend()
53 * }
54 * }
55 * ```
56 *
57 * If this flow is directly used in the application, then every time it is collected a fresh connection is
58 * established, and it will take a while before messages start flowing. However, we can share a single connection
59 * and establish it eagerly like this:
60 *
61 * ```
62 * val messages: SharedFlow<Message> = backendMessages.shareIn(scope, SharingStarted.Eagerly)
63 * ```
64 *
65 * Now a single connection is shared between all collectors from `messages`, and there is a chance that the connection
66 * is already established by the time it is needed.
67 *
68 * ### Upstream completion and error handling
69 *
70 * **Normal completion of the upstream flow has no effect on subscribers**, and the sharing coroutine continues to run. If a
71 * a strategy like [SharingStarted.WhileSubscribed] is used, then the upstream can get restarted again. If a special
72 * action on upstream completion is needed, then an [onCompletion] operator can be used before the
73 * `shareIn` operator to emit a special value in this case, like this:
74 *
75 * ```
76 * backendMessages
77 * .onCompletion { cause -> if (cause == null) emit(UpstreamHasCompletedMessage) }
78 * .shareIn(scope, SharingStarted.Eagerly)
79 * ```
80 *
81 * Any exception in the upstream flow terminates the sharing coroutine without affecting any of the subscribers,
82 * and will be handled by the [scope] in which the sharing coroutine is launched. Custom exception handling
83 * can be configured by using the [catch] or [retry] operators before the `shareIn` operator.
84 * For example, to retry connection on any `IOException` with 1 second delay between attempts, use:
85 *
86 * ```
87 * val messages = backendMessages
88 * .retry { e ->
89 * val shallRetry = e is IOException // other exception are bugs - handle them
90 * if (shallRetry) delay(1000)
91 * shallRetry
92 * }
93 * .shareIn(scope, SharingStarted.Eagerly)
94 * ```
95 *
96 * ### Initial value
97 *
98 * When a special initial value is needed to signal to subscribers that the upstream is still loading the data,
99 * use the [onStart] operator on the upstream flow. For example:
100 *
101 * ```
102 * backendMessages
103 * .onStart { emit(UpstreamIsStartingMessage) }
104 * .shareIn(scope, SharingStarted.Eagerly, 1) // replay one most recent message
105 * ```
106 *
107 * ### Buffering and conflation
108 *
109 * The `shareIn` operator runs the upstream flow in a separate coroutine, and buffers emissions from upstream as explained
110 * in the [buffer] operator's description, using a buffer of [replay] size or the default (whichever is larger).
111 * This default buffering can be overridden with an explicit buffer configuration by preceding the `shareIn` call
112 * with [buffer] or [conflate], for example:
113 *
114 * * `buffer(0).shareIn(scope, started, 0)` — overrides the default buffer size and creates a [SharedFlow] without a buffer.
115 * Effectively, it configures sequential processing between the upstream emitter and subscribers,
116 * as the emitter is suspended until all subscribers process the value. Note, that the value is still immediately
117 * discarded when there are no subscribers.
118 * * `buffer(b).shareIn(scope, started, r)` — creates a [SharedFlow] with `replay = r` and `extraBufferCapacity = b`.
119 * * `conflate().shareIn(scope, started, r)` — creates a [SharedFlow] with `replay = r`, `onBufferOverflow = DROP_OLDEST`,
120 * and `extraBufferCapacity = 1` when `replay == 0` to support this strategy.
121 *
122 * ### Operator fusion
123 *
124 * Application of [flowOn][Flow.flowOn], [buffer] with [RENDEZVOUS][Channel.RENDEZVOUS] capacity,
125 * or [cancellable] operators to the resulting shared flow has no effect.
126 *
127 * ### Exceptions
128 *
129 * This function throws [IllegalArgumentException] on unsupported values of parameters or combinations thereof.
130 *
131 * @param scope the coroutine scope in which sharing is started.
132 * @param started the strategy that controls when sharing is started and stopped.
133 * @param replay the number of values replayed to new subscribers (cannot be negative, defaults to zero).
134 */
135 public fun <T> Flow<T>.shareIn(
136 scope: CoroutineScope,
137 started: SharingStarted,
138 replay: Int = 0
139 ): SharedFlow<T> {
140 val config = configureSharing(replay)
141 val shared = MutableSharedFlow<T>(
142 replay = replay,
143 extraBufferCapacity = config.extraBufferCapacity,
144 onBufferOverflow = config.onBufferOverflow
145 )
146 @Suppress("UNCHECKED_CAST")
147 scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)
148 return shared.asSharedFlow()
149 }
150
151 private class SharingConfig<T>(
152 @JvmField val upstream: Flow<T>,
153 @JvmField val extraBufferCapacity: Int,
154 @JvmField val onBufferOverflow: BufferOverflow,
155 @JvmField val context: CoroutineContext
156 )
157
158 // Decomposes upstream flow to fuse with it when possible
configureSharingnull159 private fun <T> Flow<T>.configureSharing(replay: Int): SharingConfig<T> {
160 assert { replay >= 0 }
161 val defaultExtraCapacity = replay.coerceAtLeast(Channel.CHANNEL_DEFAULT_CAPACITY) - replay
162 // Combine with preceding buffer/flowOn and channel-using operators
163 if (this is ChannelFlow) {
164 // Check if this ChannelFlow can operate without a channel
165 val upstream = dropChannelOperators()
166 if (upstream != null) { // Yes, it can => eliminate the intermediate channel
167 return SharingConfig(
168 upstream = upstream,
169 extraBufferCapacity = when (capacity) {
170 Channel.OPTIONAL_CHANNEL, Channel.BUFFERED, 0 -> // handle special capacities
171 when {
172 onBufferOverflow == BufferOverflow.SUSPEND -> // buffer was configured with suspension
173 if (capacity == 0) 0 else defaultExtraCapacity // keep explicitly configured 0 or use default
174 replay == 0 -> 1 // no suspension => need at least buffer of one
175 else -> 0 // replay > 0 => no need for extra buffer beyond replay because we don't suspend
176 }
177 else -> capacity // otherwise just use the specified capacity as extra capacity
178 },
179 onBufferOverflow = onBufferOverflow,
180 context = context
181 )
182 }
183 }
184 // Add sharing operator on top with a default buffer
185 return SharingConfig(
186 upstream = this,
187 extraBufferCapacity = defaultExtraCapacity,
188 onBufferOverflow = BufferOverflow.SUSPEND,
189 context = EmptyCoroutineContext
190 )
191 }
192
193 // Launches sharing coroutine
launchSharingnull194 private fun <T> CoroutineScope.launchSharing(
195 context: CoroutineContext,
196 upstream: Flow<T>,
197 shared: MutableSharedFlow<T>,
198 started: SharingStarted,
199 initialValue: T
200 ) {
201 launch(context) { // the single coroutine to rule the sharing
202 // Optimize common built-in started strategies
203 when {
204 started === SharingStarted.Eagerly -> {
205 // collect immediately & forever
206 upstream.collect(shared)
207 }
208 started === SharingStarted.Lazily -> {
209 // start collecting on the first subscriber - wait for it first
210 shared.subscriptionCount.first { it > 0 }
211 upstream.collect(shared)
212 }
213 else -> {
214 // other & custom strategies
215 started.command(shared.subscriptionCount)
216 .distinctUntilChanged() // only changes in command have effect
217 .collectLatest { // cancels block on new emission
218 when (it) {
219 SharingCommand.START -> upstream.collect(shared) // can be cancelled
220 SharingCommand.STOP -> { /* just cancel and do nothing else */ }
221 SharingCommand.STOP_AND_RESET_REPLAY_CACHE -> {
222 if (initialValue === NO_VALUE) {
223 shared.resetReplayCache() // regular shared flow -> reset cache
224 } else {
225 shared.tryEmit(initialValue) // state flow -> reset to initial value
226 }
227 }
228 }
229 }
230 }
231 }
232 }
233 }
234
235 // -------------------------------- stateIn --------------------------------
236
237 /**
238 * Converts a _cold_ [Flow] into a _hot_ [StateFlow] that is started in the given coroutine [scope],
239 * sharing the most recently emitted value from a single running instance of the upstream flow with multiple
240 * downstream subscribers. See the [StateFlow] documentation for the general concepts of state flows.
241 *
242 * The starting of the sharing coroutine is controlled by the [started] parameter, as explained in the
243 * documentation for [shareIn] operator.
244 *
245 * The `stateIn` operator is useful in situations when there is a _cold_ flow that provides updates to the
246 * value of some state and is expensive to create and/or to maintain, but there are multiple subscribers
247 * that need to collect the most recent state value. For example, consider a
248 * flow of state updates coming from a backend over the expensive network connection, taking a lot of
249 * time to establish. Conceptually it might be implemented like this:
250 *
251 * ```
252 * val backendState: Flow<State> = flow {
253 * connectToBackend() // takes a lot of time
254 * try {
255 * while (true) {
256 * emit(receiveStateUpdateFromBackend())
257 * }
258 * } finally {
259 * disconnectFromBackend()
260 * }
261 * }
262 * ```
263 *
264 * If this flow is directly used in the application, then every time it is collected a fresh connection is
265 * established, and it will take a while before state updates start flowing. However, we can share a single connection
266 * and establish it eagerly like this:
267 *
268 * ```
269 * val state: StateFlow<State> = backendMessages.stateIn(scope, SharingStarted.Eagerly, State.LOADING)
270 * ```
271 *
272 * Now, a single connection is shared between all collectors from `state`, and there is a chance that the connection
273 * is already established by the time it is needed.
274 *
275 * ### Upstream completion and error handling
276 *
277 * **Normal completion of the upstream flow has no effect on subscribers**, and the sharing coroutine continues to run. If a
278 * a strategy like [SharingStarted.WhileSubscribed] is used, then the upstream can get restarted again. If a special
279 * action on upstream completion is needed, then an [onCompletion] operator can be used before
280 * the `stateIn` operator to emit a special value in this case. See the [shareIn] operator's documentation for an example.
281 *
282 * Any exception in the upstream flow terminates the sharing coroutine without affecting any of the subscribers,
283 * and will be handled by the [scope] in which the sharing coroutine is launched. Custom exception handling
284 * can be configured by using the [catch] or [retry] operators before the `stateIn` operator, similarly to
285 * the [shareIn] operator.
286 *
287 * ### Operator fusion
288 *
289 * Application of [flowOn][Flow.flowOn], [conflate][Flow.conflate],
290 * [buffer] with [CONFLATED][Channel.CONFLATED] or [RENDEZVOUS][Channel.RENDEZVOUS] capacity,
291 * [distinctUntilChanged][Flow.distinctUntilChanged], or [cancellable] operators to a state flow has no effect.
292 *
293 * @param scope the coroutine scope in which sharing is started.
294 * @param started the strategy that controls when sharing is started and stopped.
295 * @param initialValue the initial value of the state flow.
296 * This value is also used when the state flow is reset using the [SharingStarted.WhileSubscribed] strategy
297 * with the `replayExpirationMillis` parameter.
298 */
stateInnull299 public fun <T> Flow<T>.stateIn(
300 scope: CoroutineScope,
301 started: SharingStarted,
302 initialValue: T
303 ): StateFlow<T> {
304 val config = configureSharing(1)
305 val state = MutableStateFlow(initialValue)
306 scope.launchSharing(config.context, config.upstream, state, started, initialValue)
307 return state.asStateFlow()
308 }
309
310 /**
311 * Starts the upstream flow in a given [scope], suspends until the first value is emitted, and returns a _hot_
312 * [StateFlow] of future emissions, sharing the most recently emitted value from this running instance of the upstream flow
313 * with multiple downstream subscribers. See the [StateFlow] documentation for the general concepts of state flows.
314 *
315 * @param scope the coroutine scope in which sharing is started.
316 */
stateInnull317 public suspend fun <T> Flow<T>.stateIn(scope: CoroutineScope): StateFlow<T> {
318 val config = configureSharing(1)
319 val result = CompletableDeferred<StateFlow<T>>()
320 scope.launchSharingDeferred(config.context, config.upstream, result)
321 return result.await()
322 }
323
launchSharingDeferrednull324 private fun <T> CoroutineScope.launchSharingDeferred(
325 context: CoroutineContext,
326 upstream: Flow<T>,
327 result: CompletableDeferred<StateFlow<T>>
328 ) {
329 launch(context) {
330 try {
331 var state: MutableStateFlow<T>? = null
332 upstream.collect { value ->
333 state?.let { it.value = value } ?: run {
334 state = MutableStateFlow(value).also {
335 result.complete(it.asStateFlow())
336 }
337 }
338 }
339 } catch (e: Throwable) {
340 // Notify the waiter that the flow has failed
341 result.completeExceptionally(e)
342 // But still cancel the scope where state was (not) produced
343 throw e
344 }
345 }
346 }
347
348 // -------------------------------- asSharedFlow/asStateFlow --------------------------------
349
350 /**
351 * Represents this mutable shared flow as a read-only shared flow.
352 */
asSharedFlownull353 public fun <T> MutableSharedFlow<T>.asSharedFlow(): SharedFlow<T> =
354 ReadonlySharedFlow(this)
355
356 /**
357 * Represents this mutable state flow as a read-only state flow.
358 */
359 public fun <T> MutableStateFlow<T>.asStateFlow(): StateFlow<T> =
360 ReadonlyStateFlow(this)
361
362 private class ReadonlySharedFlow<T>(
363 flow: SharedFlow<T>
364 ) : SharedFlow<T> by flow, CancellableFlow<T>, FusibleFlow<T> {
365 override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
366 fuseSharedFlow(context, capacity, onBufferOverflow)
367 }
368
369 private class ReadonlyStateFlow<T>(
370 flow: StateFlow<T>
371 ) : StateFlow<T> by flow, CancellableFlow<T>, FusibleFlow<T> {
fusenull372 override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
373 fuseStateFlow(context, capacity, onBufferOverflow)
374 }
375
376 // -------------------------------- onSubscription --------------------------------
377
378 /**
379 * Returns a flow that invokes the given [action] **after** this shared flow starts to be collected
380 * (after the subscription is registered).
381 *
382 * The [action] is called before any value is emitted from the upstream
383 * flow to this subscription but after the subscription is established. It is guaranteed that all emissions to
384 * the upstream flow that happen inside or immediately after this `onSubscription` action will be
385 * collected by this subscription.
386 *
387 * The receiver of the [action] is [FlowCollector], so `onSubscription` can emit additional elements.
388 */
389 public fun <T> SharedFlow<T>.onSubscription(action: suspend FlowCollector<T>.() -> Unit): SharedFlow<T> =
390 SubscribedSharedFlow(this, action)
391
392 private class SubscribedSharedFlow<T>(
393 private val sharedFlow: SharedFlow<T>,
394 private val action: suspend FlowCollector<T>.() -> Unit
395 ) : SharedFlow<T> by sharedFlow {
396 override suspend fun collect(collector: FlowCollector<T>) =
397 sharedFlow.collect(SubscribedFlowCollector(collector, action))
398 }
399
400 internal class SubscribedFlowCollector<T>(
401 private val collector: FlowCollector<T>,
402 private val action: suspend FlowCollector<T>.() -> Unit
<lambda>null403 ) : FlowCollector<T> by collector {
404 suspend fun onSubscription() {
405 val safeCollector = SafeCollector(collector, currentCoroutineContext())
406 try {
407 safeCollector.action()
408 } finally {
409 safeCollector.releaseIntercepted()
410 }
411 if (collector is SubscribedFlowCollector) collector.onSubscription()
412 }
413 }
414