1 /* <lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.flow 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.internal.IgnoreJreRequirement 9 import kotlin.time.* 10 11 /** 12 * A command emitted by [SharingStarted] implementations to control the sharing coroutine in 13 * the [shareIn] and [stateIn] operators. 14 */ 15 public enum class SharingCommand { 16 /** 17 * Starts sharing, launching collection of the upstream flow. 18 * 19 * Emitting this command again does not do anything. Emit [STOP] and then [START] to restart an 20 * upstream flow. 21 */ 22 START, 23 24 /** 25 * Stops sharing, cancelling collection of the upstream flow. 26 */ 27 STOP, 28 29 /** 30 * Stops sharing, cancelling collection of the upstream flow, and resets the [SharedFlow.replayCache] 31 * to its initial state. 32 * The [shareIn] operator calls [MutableSharedFlow.resetReplayCache]; 33 * the [stateIn] operator resets the value to its original `initialValue`. 34 */ 35 STOP_AND_RESET_REPLAY_CACHE 36 } 37 38 /** 39 * A strategy for starting and stopping the sharing coroutine in [shareIn] and [stateIn] operators. 40 * 41 * This functional interface provides a set of built-in strategies: [Eagerly], [Lazily], [WhileSubscribed], and 42 * supports custom strategies by implementing this interface's [command] function. 43 * 44 * For example, it is possible to define a custom strategy that starts the upstream only when the number 45 * of subscribers exceeds the given `threshold` and make it an extension on [SharingStarted.Companion] so 46 * that it looks like a built-in strategy on the use-site: 47 * 48 * ``` 49 * fun SharingStarted.Companion.WhileSubscribedAtLeast(threshold: Int) = 50 * SharingStarted { subscriptionCount: StateFlow<Int> -> 51 * subscriptionCount.map { if (it >= threshold) SharingCommand.START else SharingCommand.STOP } 52 * } 53 * ``` 54 * 55 * ### Commands 56 * 57 * The `SharingStarted` strategy works by emitting [commands][SharingCommand] that control upstream flow from its 58 * [`command`][command] flow implementation function. Back-to-back emissions of the same command have no effect. 59 * Only emission of a different command has effect: 60 * 61 * * [START][SharingCommand.START] — the upstream flow is started. 62 * * [STOP][SharingCommand.STOP] — the upstream flow is stopped. 63 * * [STOP_AND_RESET_REPLAY_CACHE][SharingCommand.STOP_AND_RESET_REPLAY_CACHE] — 64 * the upstream flow is stopped and the [SharedFlow.replayCache] is reset to its initial state. 65 * The [shareIn] operator calls [MutableSharedFlow.resetReplayCache]; 66 * the [stateIn] operator resets the value to its original `initialValue`. 67 * 68 * Initially, the upstream flow is stopped and is in the initial state, so the emission of additional 69 * [STOP][SharingCommand.STOP] and [STOP_AND_RESET_REPLAY_CACHE][SharingCommand.STOP_AND_RESET_REPLAY_CACHE] commands will 70 * have no effect. 71 * 72 * The completion of the `command` flow normally has no effect (the upstream flow keeps running if it was running). 73 * The failure of the `command` flow cancels the sharing coroutine and the upstream flow. 74 */ 75 public fun interface SharingStarted { 76 public companion object { 77 /** 78 * Sharing is started immediately and never stops. 79 */ 80 public val Eagerly: SharingStarted = StartedEagerly() 81 82 /** 83 * Sharing is started when the first subscriber appears and never stops. 84 */ 85 public val Lazily: SharingStarted = StartedLazily() 86 87 /** 88 * Sharing is started when the first subscriber appears, immediately stops when the last 89 * subscriber disappears (by default), keeping the replay cache forever (by default). 90 * 91 * It has the following optional parameters: 92 * 93 * * [stopTimeoutMillis] — configures a delay (in milliseconds) between the disappearance of the last 94 * subscriber and the stopping of the sharing coroutine. It defaults to zero (stop immediately). 95 * * [replayExpirationMillis] — configures a delay (in milliseconds) between the stopping of 96 * the sharing coroutine and the resetting of the replay cache (which makes the cache empty for the [shareIn] operator 97 * and resets the cached value to the original `initialValue` for the [stateIn] operator). 98 * It defaults to `Long.MAX_VALUE` (keep replay cache forever, never reset buffer). 99 * Use zero value to expire the cache immediately. 100 * 101 * This function throws [IllegalArgumentException] when either [stopTimeoutMillis] or [replayExpirationMillis] 102 * are negative. 103 */ 104 @Suppress("FunctionName") WhileSubscribednull105 public fun WhileSubscribed( 106 stopTimeoutMillis: Long = 0, 107 replayExpirationMillis: Long = Long.MAX_VALUE 108 ): SharingStarted = 109 StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis) 110 } 111 112 /** 113 * Transforms the [subscriptionCount][MutableSharedFlow.subscriptionCount] state of the shared flow into the 114 * flow of [commands][SharingCommand] that control the sharing coroutine. See the [SharingStarted] interface 115 * documentation for details. 116 */ 117 public fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> 118 } 119 120 /** 121 * Sharing is started when the first subscriber appears, immediately stops when the last 122 * subscriber disappears (by default), keeping the replay cache forever (by default). 123 * 124 * It has the following optional parameters: 125 * 126 * * [stopTimeout] — configures a delay between the disappearance of the last 127 * subscriber and the stopping of the sharing coroutine. It defaults to zero (stop immediately). 128 * * [replayExpiration] — configures a delay between the stopping of 129 * the sharing coroutine and the resetting of the replay cache (which makes the cache empty for the [shareIn] operator 130 * and resets the cached value to the original `initialValue` for the [stateIn] operator). 131 * It defaults to [Duration.INFINITE] (keep replay cache forever, never reset buffer). 132 * Use [Duration.ZERO] value to expire the cache immediately. 133 * 134 * This function throws [IllegalArgumentException] when either [stopTimeout] or [replayExpiration] 135 * are negative. 136 */ 137 @Suppress("FunctionName") 138 public fun SharingStarted.Companion.WhileSubscribed( 139 stopTimeout: Duration = Duration.ZERO, 140 replayExpiration: Duration = Duration.INFINITE 141 ): SharingStarted = 142 StartedWhileSubscribed(stopTimeout.inWholeMilliseconds, replayExpiration.inWholeMilliseconds) 143 144 // -------------------------------- implementation -------------------------------- 145 146 private class StartedEagerly : SharingStarted { 147 override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> = 148 flowOf(SharingCommand.START) 149 override fun toString(): String = "SharingStarted.Eagerly" 150 } 151 152 private class StartedLazily : SharingStarted { <lambda>null153 override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> = flow { 154 var started = false 155 subscriptionCount.collect { count -> 156 if (count > 0 && !started) { 157 started = true 158 emit(SharingCommand.START) 159 } 160 } 161 } 162 toStringnull163 override fun toString(): String = "SharingStarted.Lazily" 164 } 165 166 private class StartedWhileSubscribed( 167 private val stopTimeout: Long, 168 private val replayExpiration: Long 169 ) : SharingStarted { 170 init { 171 require(stopTimeout >= 0) { "stopTimeout($stopTimeout ms) cannot be negative" } 172 require(replayExpiration >= 0) { "replayExpiration($replayExpiration ms) cannot be negative" } 173 } 174 175 override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> = subscriptionCount 176 .transformLatest { count -> 177 if (count > 0) { 178 emit(SharingCommand.START) 179 } else { 180 delay(stopTimeout) 181 if (replayExpiration > 0) { 182 emit(SharingCommand.STOP) 183 delay(replayExpiration) 184 } 185 emit(SharingCommand.STOP_AND_RESET_REPLAY_CACHE) 186 } 187 } 188 .dropWhile { it != SharingCommand.START } // don't emit any STOP/RESET_BUFFER to start with, only START 189 .distinctUntilChanged() // just in case somebody forgets it, don't leak our multiple sending of START 190 191 @OptIn(ExperimentalStdlibApi::class) 192 override fun toString(): String { 193 val params = buildList(2) { 194 if (stopTimeout > 0) add("stopTimeout=${stopTimeout}ms") 195 if (replayExpiration < Long.MAX_VALUE) add("replayExpiration=${replayExpiration}ms") 196 } 197 return "SharingStarted.WhileSubscribed(${params.joinToString()})" 198 } 199 200 // equals & hashcode to facilitate testing, not documented in public contract 201 override fun equals(other: Any?): Boolean = 202 other is StartedWhileSubscribed && 203 stopTimeout == other.stopTimeout && 204 replayExpiration == other.replayExpiration 205 206 @IgnoreJreRequirement // desugared hashcode implementation 207 override fun hashCode(): Int = stopTimeout.hashCode() * 31 + replayExpiration.hashCode() 208 } 209