• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.flow
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.flow.internal.*
9 import kotlin.time.*
10 
11 /**
12  * A command emitted by [SharingStarted] implementations to control the sharing coroutine in
13  * the [shareIn] and [stateIn] operators.
14  */
15 public enum class SharingCommand {
16     /**
17      * Starts sharing, launching collection of the upstream flow.
18      *
19      * Emitting this command again does not do anything. Emit [STOP] and then [START] to restart an
20      * upstream flow.
21      */
22     START,
23 
24     /**
25      * Stops sharing, cancelling collection of the upstream flow.
26      */
27     STOP,
28 
29     /**
30      * Stops sharing, cancelling collection of the upstream flow, and resets the [SharedFlow.replayCache]
31      * to its initial state.
32      * The [shareIn] operator calls [MutableSharedFlow.resetReplayCache];
33      * the [stateIn] operator resets the value to its original `initialValue`.
34      */
35     STOP_AND_RESET_REPLAY_CACHE
36 }
37 
38 /**
39  * A strategy for starting and stopping the sharing coroutine in [shareIn] and [stateIn] operators.
40  *
41  * This interface provides a set of built-in strategies: [Eagerly], [Lazily], [WhileSubscribed], and
42  * supports custom strategies by implementing this interface's [command] function.
43  *
44  * For example, it is possible to define a custom strategy that starts the upstream only when the number
45  * of subscribers exceeds the given `threshold` and make it an extension on [SharingStarted.Companion] so
46  * that it looks like a built-in strategy on the use-site:
47  *
48  * ```
49  * fun SharingStarted.Companion.WhileSubscribedAtLeast(threshold: Int): SharingStarted =
50  *     object : SharingStarted {
51  *         override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> =
52  *             subscriptionCount
53  *                 .map { if (it >= threshold) SharingCommand.START else SharingCommand.STOP }
54  *     }
55  * ```
56  *
57  * ### Commands
58  *
59  * The `SharingStarted` strategy works by emitting [commands][SharingCommand] that control upstream flow from its
60  * [`command`][command] flow implementation function. Back-to-back emissions of the same command have no effect.
61  * Only emission of a different command has effect:
62  *
63  * * [START][SharingCommand.START] &mdash; the upstream flow is stared.
64  * * [STOP][SharingCommand.STOP] &mdash; the upstream flow is stopped.
65  * * [STOP_AND_RESET_REPLAY_CACHE][SharingCommand.STOP_AND_RESET_REPLAY_CACHE] &mdash;
66  *   the upstream flow is stopped and the [SharedFlow.replayCache] is reset to its initial state.
67  *   The [shareIn] operator calls [MutableSharedFlow.resetReplayCache];
68  *   the [stateIn] operator resets the value to its original `initialValue`.
69  *
70  * Initially, the upstream flow is stopped and is in the initial state, so the emission of additional
71  * [STOP][SharingCommand.STOP] and [STOP_AND_RESET_REPLAY_CACHE][SharingCommand.STOP_AND_RESET_REPLAY_CACHE] commands will
72  * have no effect.
73  *
74  * The completion of the `command` flow normally has no effect (the upstream flow keeps running if it was running).
75  * The failure of the `command` flow cancels the sharing coroutine and the upstream flow.
76  */
77 public interface SharingStarted {
78     public companion object {
79         /**
80          * Sharing is started immediately and never stops.
81          */
82         public val Eagerly: SharingStarted = StartedEagerly()
83 
84         /**
85          * Sharing is started when the first subscriber appears and never stops.
86          */
87         public val Lazily: SharingStarted = StartedLazily()
88 
89         /**
90          * Sharing is started when the first subscriber appears, immediately stops when the last
91          * subscriber disappears (by default), keeping the replay cache forever (by default).
92          *
93          * It has the following optional parameters:
94          *
95          * * [stopTimeoutMillis] &mdash; configures a delay (in milliseconds) between the disappearance of the last
96          *   subscriber and the stopping of the sharing coroutine. It defaults to zero (stop immediately).
97          * * [replayExpirationMillis] &mdash; configures a delay (in milliseconds) between the stopping of
98          *   the sharing coroutine and the resetting of the replay cache (which makes the cache empty for the [shareIn] operator
99          *   and resets the cached value to the original `initialValue` for the [stateIn] operator).
100          *   It defaults to `Long.MAX_VALUE` (keep replay cache forever, never reset buffer).
101          *   Use zero value to expire the cache immediately.
102          *
103          * This function throws [IllegalArgumentException] when either [stopTimeoutMillis] or [replayExpirationMillis]
104          * are negative.
105          */
106         @Suppress("FunctionName")
WhileSubscribednull107         public fun WhileSubscribed(
108             stopTimeoutMillis: Long = 0,
109             replayExpirationMillis: Long = Long.MAX_VALUE
110         ): SharingStarted =
111             StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis)
112     }
113 
114     /**
115      * Transforms the [subscriptionCount][MutableSharedFlow.subscriptionCount] state of the shared flow into the
116      * flow of [commands][SharingCommand] that control the sharing coroutine. See the [SharingStarted] interface
117      * documentation for details.
118      */
119     public fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand>
120 }
121 
122 /**
123  * Sharing is started when the first subscriber appears, immediately stops when the last
124  * subscriber disappears (by default), keeping the replay cache forever (by default).
125  *
126  * It has the following optional parameters:
127  *
128  * * [stopTimeout] &mdash; configures a delay between the disappearance of the last
129  *   subscriber and the stopping of the sharing coroutine. It defaults to zero (stop immediately).
130  * * [replayExpiration] &mdash; configures a delay between the stopping of
131  *   the sharing coroutine and the resetting of the replay cache (which makes the cache empty for the [shareIn] operator
132  *   and resets the cached value to the original `initialValue` for the [stateIn] operator).
133  *   It defaults to [Duration.INFINITE] (keep replay cache forever, never reset buffer).
134  *   Use [Duration.ZERO] value to expire the cache immediately.
135  *
136  * This function throws [IllegalArgumentException] when either [stopTimeout] or [replayExpiration]
137  * are negative.
138  */
139 @Suppress("FunctionName")
140 @ExperimentalTime
141 public fun SharingStarted.Companion.WhileSubscribed(
142     stopTimeout: Duration = Duration.ZERO,
143     replayExpiration: Duration = Duration.INFINITE
144 ): SharingStarted =
145     StartedWhileSubscribed(stopTimeout.toLongMilliseconds(), replayExpiration.toLongMilliseconds())
146 
147 // -------------------------------- implementation --------------------------------
148 
149 private class StartedEagerly : SharingStarted {
150     override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> =
151         flowOf(SharingCommand.START)
152     override fun toString(): String = "SharingStarted.Eagerly"
153 }
154 
155 private class StartedLazily : SharingStarted {
<lambda>null156     override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> = flow {
157         var started = false
158         subscriptionCount.collect { count ->
159             if (count > 0 && !started) {
160                 started = true
161                 emit(SharingCommand.START)
162             }
163         }
164     }
165 
toStringnull166     override fun toString(): String = "SharingStarted.Lazily"
167 }
168 
169 private class StartedWhileSubscribed(
170     private val stopTimeout: Long,
171     private val replayExpiration: Long
172 ) : SharingStarted {
173     init {
174         require(stopTimeout >= 0) { "stopTimeout($stopTimeout ms) cannot be negative" }
175         require(replayExpiration >= 0) { "replayExpiration($replayExpiration ms) cannot be negative" }
176     }
177 
178     override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> = subscriptionCount
179         .transformLatest { count ->
180             if (count > 0) {
181                 emit(SharingCommand.START)
182             } else {
183                 delay(stopTimeout)
184                 if (replayExpiration > 0) {
185                     emit(SharingCommand.STOP)
186                     delay(replayExpiration)
187                 }
188                 emit(SharingCommand.STOP_AND_RESET_REPLAY_CACHE)
189             }
190         }
191         .dropWhile { it != SharingCommand.START } // don't emit any STOP/RESET_BUFFER to start with, only START
192         .distinctUntilChanged() // just in case somebody forgets it, don't leak our multiple sending of START
193 
194     @OptIn(ExperimentalStdlibApi::class)
195     override fun toString(): String {
196         val params = buildList(2) {
197             if (stopTimeout > 0) add("stopTimeout=${stopTimeout}ms")
198             if (replayExpiration < Long.MAX_VALUE) add("replayExpiration=${replayExpiration}ms")
199         }
200         return "SharingStarted.WhileSubscribed(${params.joinToString()})"
201     }
202 
203     // equals & hashcode to facilitate testing, not documented in public contract
204     override fun equals(other: Any?): Boolean =
205         other is StartedWhileSubscribed &&
206             stopTimeout == other.stopTimeout &&
207             replayExpiration == other.replayExpiration
208 
209     override fun hashCode(): Int = stopTimeout.hashCode() * 31 + replayExpiration.hashCode()
210 }
211