• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.flow
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.internal.IgnoreJreRequirement
9 import kotlin.time.*
10 
11 /**
12  * A command emitted by [SharingStarted] implementations to control the sharing coroutine in
13  * the [shareIn] and [stateIn] operators.
14  */
15 public enum class SharingCommand {
16     /**
17      * Starts sharing, launching collection of the upstream flow.
18      *
19      * Emitting this command again does not do anything. Emit [STOP] and then [START] to restart an
20      * upstream flow.
21      */
22     START,
23 
24     /**
25      * Stops sharing, cancelling collection of the upstream flow.
26      */
27     STOP,
28 
29     /**
30      * Stops sharing, cancelling collection of the upstream flow, and resets the [SharedFlow.replayCache]
31      * to its initial state.
32      * The [shareIn] operator calls [MutableSharedFlow.resetReplayCache];
33      * the [stateIn] operator resets the value to its original `initialValue`.
34      */
35     STOP_AND_RESET_REPLAY_CACHE
36 }
37 
38 /**
39  * A strategy for starting and stopping the sharing coroutine in [shareIn] and [stateIn] operators.
40  *
41  * This functional interface provides a set of built-in strategies: [Eagerly], [Lazily], [WhileSubscribed], and
42  * supports custom strategies by implementing this interface's [command] function.
43  *
44  * For example, it is possible to define a custom strategy that starts the upstream only when the number
45  * of subscribers exceeds the given `threshold` and make it an extension on [SharingStarted.Companion] so
46  * that it looks like a built-in strategy on the use-site:
47  *
48  * ```
49  * fun SharingStarted.Companion.WhileSubscribedAtLeast(threshold: Int) =
50  *     SharingStarted { subscriptionCount: StateFlow<Int> ->
51  *         subscriptionCount.map { if (it >= threshold) SharingCommand.START else SharingCommand.STOP }
52  *     }
53  * ```
54  *
55  * ### Commands
56  *
57  * The `SharingStarted` strategy works by emitting [commands][SharingCommand] that control upstream flow from its
58  * [`command`][command] flow implementation function. Back-to-back emissions of the same command have no effect.
59  * Only emission of a different command has effect:
60  *
61  * * [START][SharingCommand.START] &mdash; the upstream flow is started.
62  * * [STOP][SharingCommand.STOP] &mdash; the upstream flow is stopped.
63  * * [STOP_AND_RESET_REPLAY_CACHE][SharingCommand.STOP_AND_RESET_REPLAY_CACHE] &mdash;
64  *   the upstream flow is stopped and the [SharedFlow.replayCache] is reset to its initial state.
65  *   The [shareIn] operator calls [MutableSharedFlow.resetReplayCache];
66  *   the [stateIn] operator resets the value to its original `initialValue`.
67  *
68  * Initially, the upstream flow is stopped and is in the initial state, so the emission of additional
69  * [STOP][SharingCommand.STOP] and [STOP_AND_RESET_REPLAY_CACHE][SharingCommand.STOP_AND_RESET_REPLAY_CACHE] commands will
70  * have no effect.
71  *
72  * The completion of the `command` flow normally has no effect (the upstream flow keeps running if it was running).
73  * The failure of the `command` flow cancels the sharing coroutine and the upstream flow.
74  */
75 public fun interface SharingStarted {
76     public companion object {
77         /**
78          * Sharing is started immediately and never stops.
79          */
80         public val Eagerly: SharingStarted = StartedEagerly()
81 
82         /**
83          * Sharing is started when the first subscriber appears and never stops.
84          */
85         public val Lazily: SharingStarted = StartedLazily()
86 
87         /**
88          * Sharing is started when the first subscriber appears, immediately stops when the last
89          * subscriber disappears (by default), keeping the replay cache forever (by default).
90          *
91          * It has the following optional parameters:
92          *
93          * * [stopTimeoutMillis] &mdash; configures a delay (in milliseconds) between the disappearance of the last
94          *   subscriber and the stopping of the sharing coroutine. It defaults to zero (stop immediately).
95          * * [replayExpirationMillis] &mdash; configures a delay (in milliseconds) between the stopping of
96          *   the sharing coroutine and the resetting of the replay cache (which makes the cache empty for the [shareIn] operator
97          *   and resets the cached value to the original `initialValue` for the [stateIn] operator).
98          *   It defaults to `Long.MAX_VALUE` (keep replay cache forever, never reset buffer).
99          *   Use zero value to expire the cache immediately.
100          *
101          * This function throws [IllegalArgumentException] when either [stopTimeoutMillis] or [replayExpirationMillis]
102          * are negative.
103          */
104         @Suppress("FunctionName")
WhileSubscribednull105         public fun WhileSubscribed(
106             stopTimeoutMillis: Long = 0,
107             replayExpirationMillis: Long = Long.MAX_VALUE
108         ): SharingStarted =
109             StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis)
110     }
111 
112     /**
113      * Transforms the [subscriptionCount][MutableSharedFlow.subscriptionCount] state of the shared flow into the
114      * flow of [commands][SharingCommand] that control the sharing coroutine. See the [SharingStarted] interface
115      * documentation for details.
116      */
117     public fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand>
118 }
119 
120 /**
121  * Sharing is started when the first subscriber appears, immediately stops when the last
122  * subscriber disappears (by default), keeping the replay cache forever (by default).
123  *
124  * It has the following optional parameters:
125  *
126  * * [stopTimeout] &mdash; configures a delay between the disappearance of the last
127  *   subscriber and the stopping of the sharing coroutine. It defaults to zero (stop immediately).
128  * * [replayExpiration] &mdash; configures a delay between the stopping of
129  *   the sharing coroutine and the resetting of the replay cache (which makes the cache empty for the [shareIn] operator
130  *   and resets the cached value to the original `initialValue` for the [stateIn] operator).
131  *   It defaults to [Duration.INFINITE] (keep replay cache forever, never reset buffer).
132  *   Use [Duration.ZERO] value to expire the cache immediately.
133  *
134  * This function throws [IllegalArgumentException] when either [stopTimeout] or [replayExpiration]
135  * are negative.
136  */
137 @Suppress("FunctionName")
138 public fun SharingStarted.Companion.WhileSubscribed(
139     stopTimeout: Duration = Duration.ZERO,
140     replayExpiration: Duration = Duration.INFINITE
141 ): SharingStarted =
142     StartedWhileSubscribed(stopTimeout.inWholeMilliseconds, replayExpiration.inWholeMilliseconds)
143 
144 // -------------------------------- implementation --------------------------------
145 
146 private class StartedEagerly : SharingStarted {
147     override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> =
148         flowOf(SharingCommand.START)
149     override fun toString(): String = "SharingStarted.Eagerly"
150 }
151 
152 private class StartedLazily : SharingStarted {
<lambda>null153     override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> = flow {
154         var started = false
155         subscriptionCount.collect { count ->
156             if (count > 0 && !started) {
157                 started = true
158                 emit(SharingCommand.START)
159             }
160         }
161     }
162 
toStringnull163     override fun toString(): String = "SharingStarted.Lazily"
164 }
165 
166 private class StartedWhileSubscribed(
167     private val stopTimeout: Long,
168     private val replayExpiration: Long
169 ) : SharingStarted {
170     init {
171         require(stopTimeout >= 0) { "stopTimeout($stopTimeout ms) cannot be negative" }
172         require(replayExpiration >= 0) { "replayExpiration($replayExpiration ms) cannot be negative" }
173     }
174 
175     override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> = subscriptionCount
176         .transformLatest { count ->
177             if (count > 0) {
178                 emit(SharingCommand.START)
179             } else {
180                 delay(stopTimeout)
181                 if (replayExpiration > 0) {
182                     emit(SharingCommand.STOP)
183                     delay(replayExpiration)
184                 }
185                 emit(SharingCommand.STOP_AND_RESET_REPLAY_CACHE)
186             }
187         }
188         .dropWhile { it != SharingCommand.START } // don't emit any STOP/RESET_BUFFER to start with, only START
189         .distinctUntilChanged() // just in case somebody forgets it, don't leak our multiple sending of START
190 
191     @OptIn(ExperimentalStdlibApi::class)
192     override fun toString(): String {
193         val params = buildList(2) {
194             if (stopTimeout > 0) add("stopTimeout=${stopTimeout}ms")
195             if (replayExpiration < Long.MAX_VALUE) add("replayExpiration=${replayExpiration}ms")
196         }
197         return "SharingStarted.WhileSubscribed(${params.joinToString()})"
198     }
199 
200     // equals & hashcode to facilitate testing, not documented in public contract
201     override fun equals(other: Any?): Boolean =
202         other is StartedWhileSubscribed &&
203             stopTimeout == other.stopTimeout &&
204             replayExpiration == other.replayExpiration
205 
206     @IgnoreJreRequirement // desugared hashcode implementation
207     override fun hashCode(): Int = stopTimeout.hashCode() * 31 + replayExpiration.hashCode()
208 }
209