• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 @file:JvmMultifileClass
6 @file:JvmName("FlowKt")
7 
8 package kotlinx.coroutines.flow
9 
10 import kotlinx.coroutines.*
11 import kotlinx.coroutines.channels.*
12 import kotlinx.coroutines.flow.internal.*
13 import kotlin.coroutines.*
14 import kotlin.jvm.*
15 
16 // -------------------------------- shareIn --------------------------------
17 
18 /**
19  * Converts a _cold_ [Flow] into a _hot_ [SharedFlow] that is started in the given coroutine [scope],
20  * sharing emissions from a single running instance of the upstream flow with multiple downstream subscribers,
21  * and replaying a specified number of [replay] values to new subscribers. See the [SharedFlow] documentation
22  * for the general concepts of shared flows.
23  *
24  * The starting of the sharing coroutine is controlled by the [started] parameter. The following options
25  * are supported.
26  *
27  * * [Eagerly][SharingStarted.Eagerly] &mdash; the upstream flow is started even before the first subscriber appears. Note
28  *   that in this case all values emitted by the upstream beyond the most recent values as specified by
29  *   [replay] parameter **will be immediately discarded**.
30  * * [Lazily][SharingStarted.Lazily] &mdash; starts the upstream flow after the first subscriber appears, which guarantees
31  *   that this first subscriber gets all the emitted values, while subsequent subscribers are only guaranteed to
32  *   get the most recent [replay] values. The upstream flow continues to be active even when all subscribers
33  *   disappear, but only the most recent [replay] values are cached without subscribers.
34  * * [WhileSubscribed()][SharingStarted.WhileSubscribed] &mdash; starts the upstream flow when the first subscriber
35  *   appears, immediately stops when the last subscriber disappears, keeping the replay cache forever.
36  *   It has additional optional configuration parameters as explained in its documentation.
37  * * A custom strategy can be supplied by implementing the [SharingStarted] interface.
38  *
39  * The `shareIn` operator is useful in situations when there is a _cold_ flow that is expensive to create and/or
40  * to maintain, but there are multiple subscribers that need to collect its values. For example, consider a
41  * flow of messages coming from a backend over the expensive network connection, taking a lot of
42  * time to establish. Conceptually, it might be implemented like this:
43  *
44  * ```
45  * val backendMessages: Flow<Message> = flow {
46  *     connectToBackend() // takes a lot of time
47  *     try {
48  *       while (true) {
49  *           emit(receiveMessageFromBackend())
50  *       }
51  *     } finally {
52  *         disconnectFromBackend()
53  *     }
54  * }
55  * ```
56  *
57  * If this flow is directly used in the application, then every time it is collected a fresh connection is
58  * established, and it will take a while before messages start flowing. However, we can share a single connection
59  * and establish it eagerly like this:
60  *
61  * ```
62  * val messages: SharedFlow<Message> = backendMessages.shareIn(scope, SharingStarted.Eagerly)
63  * ```
64  *
65  * Now a single connection is shared between all collectors from `messages`, and there is a chance that the connection
66  * is already established by the time it is needed.
67  *
68  * ### Upstream completion and error handling
69  *
70  * **Normal completion of the upstream flow has no effect on subscribers**, and the sharing coroutine continues to run. If a
71  * strategy like [SharingStarted.WhileSubscribed] is used, then the upstream can get restarted again. If a special
72  * action on upstream completion is needed, then an [onCompletion] operator can be used before the
73  * `shareIn` operator to emit a special value in this case, like this:
74  *
75  * ```
76  * backendMessages
77  *     .onCompletion { cause -> if (cause == null) emit(UpstreamHasCompletedMessage) }
78  *     .shareIn(scope, SharingStarted.Eagerly)
79  * ```
80  *
81  * Any exception in the upstream flow terminates the sharing coroutine without affecting any of the subscribers,
82  * and will be handled by the [scope] in which the sharing coroutine is launched. Custom exception handling
83  * can be configured by using the [catch] or [retry] operators before the `shareIn` operator.
84  * For example, to retry connection on any `IOException` with 1 second delay between attempts, use:
85  *
86  * ```
87  * val messages = backendMessages
88  *     .retry { e ->
89  *         val shallRetry = e is IOException // other exception are bugs - handle them
90  *         if (shallRetry) delay(1000)
91  *         shallRetry
92  *     }
93  *     .shareIn(scope, SharingStarted.Eagerly)
94  * ```
95  *
96  * ### Initial value
97  *
98  * When a special initial value is needed to signal to subscribers that the upstream is still loading the data,
99  * use the [onStart] operator on the upstream flow. For example:
100  *
101  * ```
102  * backendMessages
103  *     .onStart { emit(UpstreamIsStartingMessage) }
104  *     .shareIn(scope, SharingStarted.Eagerly, 1) // replay one most recent message
105  * ```
106  *
107  * ### Buffering and conflation
108  *
109  * The `shareIn` operator runs the upstream flow in a separate coroutine, and buffers emissions from upstream as explained
110  * in the [buffer] operator's description, using a buffer of [replay] size or the default (whichever is larger).
111  * This default buffering can be overridden with an explicit buffer configuration by preceding the `shareIn` call
112  * with [buffer] or [conflate], for example:
113  *
114  * * `buffer(0).shareIn(scope, started, 0)` &mdash; overrides the default buffer size and creates a [SharedFlow] without a buffer.
115  *   Effectively, it configures sequential processing between the upstream emitter and subscribers,
116  *   as the emitter is suspended until all subscribers process the value. Note, that the value is still immediately
117  *   discarded when there are no subscribers.
118  * * `buffer(b).shareIn(scope, started, r)` &mdash; creates a [SharedFlow] with `replay = r` and `extraBufferCapacity = b`.
119  * * `conflate().shareIn(scope, started, r)` &mdash; creates a [SharedFlow] with `replay = r`, `onBufferOverflow = DROP_OLDEST`,
120  *   and `extraBufferCapacity = 1` when `replay == 0` to support this strategy.
121  *
122  * ### Operator fusion
123  *
124  * Application of [flowOn][Flow.flowOn], [buffer] with [RENDEZVOUS][Channel.RENDEZVOUS] capacity,
125  * or [cancellable] operators to the resulting shared flow has no effect.
126  *
127  * ### Exceptions
128  *
129  * This function throws [IllegalArgumentException] on unsupported values of parameters or combinations thereof.
130  *
131  * @param scope the coroutine scope in which sharing is started.
132  * @param started the strategy that controls when sharing is started and stopped.
133  * @param replay the number of values replayed to new subscribers (cannot be negative, defaults to zero).
134  */
135 public fun <T> Flow<T>.shareIn(
136     scope: CoroutineScope,
137     started: SharingStarted,
138     replay: Int = 0
139 ): SharedFlow<T> {
140     val config = configureSharing(replay)
141     val shared = MutableSharedFlow<T>(
142         replay = replay,
143         extraBufferCapacity = config.extraBufferCapacity,
144         onBufferOverflow = config.onBufferOverflow
145     )
146     @Suppress("UNCHECKED_CAST")
147     val job = scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)
148     return ReadonlySharedFlow(shared, job)
149 }
150 
151 private class SharingConfig<T>(
152     @JvmField val upstream: Flow<T>,
153     @JvmField val extraBufferCapacity: Int,
154     @JvmField val onBufferOverflow: BufferOverflow,
155     @JvmField val context: CoroutineContext
156 )
157 
158 // Decomposes upstream flow to fuse with it when possible
configureSharingnull159 private fun <T> Flow<T>.configureSharing(replay: Int): SharingConfig<T> {
160     assert { replay >= 0 }
161     val defaultExtraCapacity = replay.coerceAtLeast(Channel.CHANNEL_DEFAULT_CAPACITY) - replay
162     // Combine with preceding buffer/flowOn and channel-using operators
163     if (this is ChannelFlow) {
164         // Check if this ChannelFlow can operate without a channel
165         val upstream = dropChannelOperators()
166         if (upstream != null) { // Yes, it can => eliminate the intermediate channel
167             return SharingConfig(
168                 upstream = upstream,
169                 extraBufferCapacity = when (capacity) {
170                     Channel.OPTIONAL_CHANNEL, Channel.BUFFERED, 0 -> // handle special capacities
171                         when {
172                             onBufferOverflow == BufferOverflow.SUSPEND -> // buffer was configured with suspension
173                                 if (capacity == 0) 0 else defaultExtraCapacity // keep explicitly configured 0 or use default
174                             replay == 0 -> 1 // no suspension => need at least buffer of one
175                             else -> 0 // replay > 0 => no need for extra buffer beyond replay because we don't suspend
176                         }
177                     else -> capacity // otherwise just use the specified capacity as extra capacity
178                 },
179                 onBufferOverflow = onBufferOverflow,
180                 context = context
181             )
182         }
183     }
184     // Add sharing operator on top with a default buffer
185     return SharingConfig(
186         upstream = this,
187         extraBufferCapacity = defaultExtraCapacity,
188         onBufferOverflow = BufferOverflow.SUSPEND,
189         context = EmptyCoroutineContext
190     )
191 }
192 
193 // Launches sharing coroutine
launchSharingnull194 private fun <T> CoroutineScope.launchSharing(
195     context: CoroutineContext,
196     upstream: Flow<T>,
197     shared: MutableSharedFlow<T>,
198     started: SharingStarted,
199     initialValue: T
200 ): Job {
201     /*
202      * Conditional start: in the case when sharing and subscribing happens in the same dispatcher, we want to
203      * have the following invariants preserved:
204      * * Delayed sharing strategies have a chance to immediately observe consecutive subscriptions.
205      *   E.g. in the cases like `flow.shareIn(...); flow.take(1)` we want sharing strategy to see the initial subscription
206      * * Eager sharing does not start immediately, so the subscribers have actual chance to subscribe _prior_ to sharing.
207      */
208     val start = if (started == SharingStarted.Eagerly) CoroutineStart.DEFAULT else CoroutineStart.UNDISPATCHED
209     return launch(context, start = start) { // the single coroutine to rule the sharing
210         // Optimize common built-in started strategies
211         when {
212             started === SharingStarted.Eagerly -> {
213                 // collect immediately & forever
214                 upstream.collect(shared)
215             }
216             started === SharingStarted.Lazily -> {
217                 // start collecting on the first subscriber - wait for it first
218                 shared.subscriptionCount.first { it > 0 }
219                 upstream.collect(shared)
220             }
221             else -> {
222                 // other & custom strategies
223                 started.command(shared.subscriptionCount)
224                     .distinctUntilChanged() // only changes in command have effect
225                     .collectLatest { // cancels block on new emission
226                         when (it) {
227                             SharingCommand.START -> upstream.collect(shared) // can be cancelled
228                             SharingCommand.STOP -> { /* just cancel and do nothing else */ }
229                             SharingCommand.STOP_AND_RESET_REPLAY_CACHE -> {
230                                 if (initialValue === NO_VALUE) {
231                                     shared.resetReplayCache() // regular shared flow -> reset cache
232                                 } else {
233                                     shared.tryEmit(initialValue) // state flow -> reset to initial value
234                                 }
235                             }
236                         }
237                     }
238             }
239         }
240     }
241 }
242 
243 // -------------------------------- stateIn --------------------------------
244 
245 /**
246  * Converts a _cold_ [Flow] into a _hot_ [StateFlow] that is started in the given coroutine [scope],
247  * sharing the most recently emitted value from a single running instance of the upstream flow with multiple
248  * downstream subscribers. See the [StateFlow] documentation for the general concepts of state flows.
249  *
250  * The starting of the sharing coroutine is controlled by the [started] parameter, as explained in the
251  * documentation for [shareIn] operator.
252  *
253  * The `stateIn` operator is useful in situations when there is a _cold_ flow that provides updates to the
254  * value of some state and is expensive to create and/or to maintain, but there are multiple subscribers
255  * that need to collect the most recent state value. For example, consider a
256  * flow of state updates coming from a backend over the expensive network connection, taking a lot of
257  * time to establish. Conceptually it might be implemented like this:
258  *
259  * ```
260  * val backendState: Flow<State> = flow {
261  *     connectToBackend() // takes a lot of time
262  *     try {
263  *       while (true) {
264  *           emit(receiveStateUpdateFromBackend())
265  *       }
266  *     } finally {
267  *         disconnectFromBackend()
268  *     }
269  * }
270  * ```
271  *
272  * If this flow is directly used in the application, then every time it is collected a fresh connection is
273  * established, and it will take a while before state updates start flowing. However, we can share a single connection
274  * and establish it eagerly like this:
275  *
276  * ```
277  * val state: StateFlow<State> = backendMessages.stateIn(scope, SharingStarted.Eagerly, State.LOADING)
278  * ```
279  *
280  * Now, a single connection is shared between all collectors from `state`, and there is a chance that the connection
281  * is already established by the time it is needed.
282  *
283  * ### Upstream completion and error handling
284  *
285  * **Normal completion of the upstream flow has no effect on subscribers**, and the sharing coroutine continues to run. If a
286  * a strategy like [SharingStarted.WhileSubscribed] is used, then the upstream can get restarted again. If a special
287  * action on upstream completion is needed, then an [onCompletion] operator can be used before
288  * the `stateIn` operator to emit a special value in this case. See the [shareIn] operator's documentation for an example.
289  *
290  * Any exception in the upstream flow terminates the sharing coroutine without affecting any of the subscribers,
291  * and will be handled by the [scope] in which the sharing coroutine is launched. Custom exception handling
292  * can be configured by using the [catch] or [retry] operators before the `stateIn` operator, similarly to
293  * the [shareIn] operator.
294  *
295  * ### Operator fusion
296  *
297  * Application of [flowOn][Flow.flowOn], [conflate][Flow.conflate],
298  * [buffer] with [CONFLATED][Channel.CONFLATED] or [RENDEZVOUS][Channel.RENDEZVOUS] capacity,
299  * [distinctUntilChanged][Flow.distinctUntilChanged], or [cancellable] operators to a state flow has no effect.
300  *
301  * @param scope the coroutine scope in which sharing is started.
302  * @param started the strategy that controls when sharing is started and stopped.
303  * @param initialValue the initial value of the state flow.
304  *   This value is also used when the state flow is reset using the [SharingStarted.WhileSubscribed] strategy
305  *   with the `replayExpirationMillis` parameter.
306  */
stateInnull307 public fun <T> Flow<T>.stateIn(
308     scope: CoroutineScope,
309     started: SharingStarted,
310     initialValue: T
311 ): StateFlow<T> {
312     val config = configureSharing(1)
313     val state = MutableStateFlow(initialValue)
314     val job = scope.launchSharing(config.context, config.upstream, state, started, initialValue)
315     return ReadonlyStateFlow(state, job)
316 }
317 
318 /**
319  * Starts the upstream flow in a given [scope], suspends until the first value is emitted, and returns a _hot_
320  * [StateFlow] of future emissions, sharing the most recently emitted value from this running instance of the upstream flow
321  * with multiple downstream subscribers. See the [StateFlow] documentation for the general concepts of state flows.
322  *
323  * @param scope the coroutine scope in which sharing is started.
324  */
stateInnull325 public suspend fun <T> Flow<T>.stateIn(scope: CoroutineScope): StateFlow<T> {
326     val config = configureSharing(1)
327     val result = CompletableDeferred<StateFlow<T>>()
328     scope.launchSharingDeferred(config.context, config.upstream, result)
329     return result.await()
330 }
331 
launchSharingDeferrednull332 private fun <T> CoroutineScope.launchSharingDeferred(
333     context: CoroutineContext,
334     upstream: Flow<T>,
335     result: CompletableDeferred<StateFlow<T>>
336 ) {
337     launch(context) {
338         try {
339             var state: MutableStateFlow<T>? = null
340             upstream.collect { value ->
341                 state?.let { it.value = value } ?: run {
342                     state = MutableStateFlow(value).also {
343                         result.complete(ReadonlyStateFlow(it, coroutineContext.job))
344                     }
345                 }
346             }
347         } catch (e: Throwable) {
348             // Notify the waiter that the flow has failed
349             result.completeExceptionally(e)
350             // But still cancel the scope where state was (not) produced
351             throw e
352         }
353     }
354 }
355 
356 // -------------------------------- asSharedFlow/asStateFlow --------------------------------
357 
358 /**
359  * Represents this mutable shared flow as a read-only shared flow.
360  */
asSharedFlownull361 public fun <T> MutableSharedFlow<T>.asSharedFlow(): SharedFlow<T> =
362     ReadonlySharedFlow(this, null)
363 
364 /**
365  * Represents this mutable state flow as a read-only state flow.
366  */
367 public fun <T> MutableStateFlow<T>.asStateFlow(): StateFlow<T> =
368     ReadonlyStateFlow(this, null)
369 
370 private class ReadonlySharedFlow<T>(
371     flow: SharedFlow<T>,
372     @Suppress("unused")
373     private val job: Job? // keeps a strong reference to the job (if present)
374 ) : SharedFlow<T> by flow, CancellableFlow<T>, FusibleFlow<T> {
375     override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
376         fuseSharedFlow(context, capacity, onBufferOverflow)
377 }
378 
379 private class ReadonlyStateFlow<T>(
380     flow: StateFlow<T>,
381     @Suppress("unused")
382     private val job: Job? // keeps a strong reference to the job (if present)
383 ) : StateFlow<T> by flow, CancellableFlow<T>, FusibleFlow<T> {
fusenull384     override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
385         fuseStateFlow(context, capacity, onBufferOverflow)
386 }
387 
388 // -------------------------------- onSubscription --------------------------------
389 
390 /**
391  * Returns a flow that invokes the given [action] **after** this shared flow starts to be collected
392  * (after the subscription is registered).
393  *
394  * The [action] is called before any value is emitted from the upstream
395  * flow to this subscription but after the subscription is established. It is guaranteed that all emissions to
396  * the upstream flow that happen inside or immediately after this `onSubscription` action will be
397  * collected by this subscription.
398  *
399  * The receiver of the [action] is [FlowCollector], so `onSubscription` can emit additional elements.
400  */
401 public fun <T> SharedFlow<T>.onSubscription(action: suspend FlowCollector<T>.() -> Unit): SharedFlow<T> =
402     SubscribedSharedFlow(this, action)
403 
404 private class SubscribedSharedFlow<T>(
405     private val sharedFlow: SharedFlow<T>,
406     private val action: suspend FlowCollector<T>.() -> Unit
407 ) : SharedFlow<T> by sharedFlow {
408     override suspend fun collect(collector: FlowCollector<T>) =
409         sharedFlow.collect(SubscribedFlowCollector(collector, action))
410 }
411 
412 internal class SubscribedFlowCollector<T>(
413     private val collector: FlowCollector<T>,
414     private val action: suspend FlowCollector<T>.() -> Unit
<lambda>null415 ) : FlowCollector<T> by collector {
416     suspend fun onSubscription() {
417         val safeCollector = SafeCollector(collector, currentCoroutineContext())
418         try {
419             safeCollector.action()
420         } finally {
421             safeCollector.releaseIntercepted()
422         }
423         if (collector is SubscribedFlowCollector) collector.onSubscription()
424     }
425 }
426