1 /* <lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.flow 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.flow.internal.* 9 import kotlin.time.* 10 11 /** 12 * A command emitted by [SharingStarted] implementations to control the sharing coroutine in 13 * the [shareIn] and [stateIn] operators. 14 */ 15 public enum class SharingCommand { 16 /** 17 * Starts sharing, launching collection of the upstream flow. 18 * 19 * Emitting this command again does not do anything. Emit [STOP] and then [START] to restart an 20 * upstream flow. 21 */ 22 START, 23 24 /** 25 * Stops sharing, cancelling collection of the upstream flow. 26 */ 27 STOP, 28 29 /** 30 * Stops sharing, cancelling collection of the upstream flow, and resets the [SharedFlow.replayCache] 31 * to its initial state. 32 * The [shareIn] operator calls [MutableSharedFlow.resetReplayCache]; 33 * the [stateIn] operator resets the value to its original `initialValue`. 34 */ 35 STOP_AND_RESET_REPLAY_CACHE 36 } 37 38 /** 39 * A strategy for starting and stopping the sharing coroutine in [shareIn] and [stateIn] operators. 40 * 41 * This interface provides a set of built-in strategies: [Eagerly], [Lazily], [WhileSubscribed], and 42 * supports custom strategies by implementing this interface's [command] function. 43 * 44 * For example, it is possible to define a custom strategy that starts the upstream only when the number 45 * of subscribers exceeds the given `threshold` and make it an extension on [SharingStarted.Companion] so 46 * that it looks like a built-in strategy on the use-site: 47 * 48 * ``` 49 * fun SharingStarted.Companion.WhileSubscribedAtLeast(threshold: Int): SharingStarted = 50 * object : SharingStarted { 51 * override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> = 52 * subscriptionCount 53 * .map { if (it >= threshold) SharingCommand.START else SharingCommand.STOP } 54 * } 55 * ``` 56 * 57 * ### Commands 58 * 59 * The `SharingStarted` strategy works by emitting [commands][SharingCommand] that control upstream flow from its 60 * [`command`][command] flow implementation function. Back-to-back emissions of the same command have no effect. 61 * Only emission of a different command has effect: 62 * 63 * * [START][SharingCommand.START] — the upstream flow is stared. 64 * * [STOP][SharingCommand.STOP] — the upstream flow is stopped. 65 * * [STOP_AND_RESET_REPLAY_CACHE][SharingCommand.STOP_AND_RESET_REPLAY_CACHE] — 66 * the upstream flow is stopped and the [SharedFlow.replayCache] is reset to its initial state. 67 * The [shareIn] operator calls [MutableSharedFlow.resetReplayCache]; 68 * the [stateIn] operator resets the value to its original `initialValue`. 69 * 70 * Initially, the upstream flow is stopped and is in the initial state, so the emission of additional 71 * [STOP][SharingCommand.STOP] and [STOP_AND_RESET_REPLAY_CACHE][SharingCommand.STOP_AND_RESET_REPLAY_CACHE] commands will 72 * have no effect. 73 * 74 * The completion of the `command` flow normally has no effect (the upstream flow keeps running if it was running). 75 * The failure of the `command` flow cancels the sharing coroutine and the upstream flow. 76 */ 77 public interface SharingStarted { 78 public companion object { 79 /** 80 * Sharing is started immediately and never stops. 81 */ 82 public val Eagerly: SharingStarted = StartedEagerly() 83 84 /** 85 * Sharing is started when the first subscriber appears and never stops. 86 */ 87 public val Lazily: SharingStarted = StartedLazily() 88 89 /** 90 * Sharing is started when the first subscriber appears, immediately stops when the last 91 * subscriber disappears (by default), keeping the replay cache forever (by default). 92 * 93 * It has the following optional parameters: 94 * 95 * * [stopTimeoutMillis] — configures a delay (in milliseconds) between the disappearance of the last 96 * subscriber and the stopping of the sharing coroutine. It defaults to zero (stop immediately). 97 * * [replayExpirationMillis] — configures a delay (in milliseconds) between the stopping of 98 * the sharing coroutine and the resetting of the replay cache (which makes the cache empty for the [shareIn] operator 99 * and resets the cached value to the original `initialValue` for the [stateIn] operator). 100 * It defaults to `Long.MAX_VALUE` (keep replay cache forever, never reset buffer). 101 * Use zero value to expire the cache immediately. 102 * 103 * This function throws [IllegalArgumentException] when either [stopTimeoutMillis] or [replayExpirationMillis] 104 * are negative. 105 */ 106 @Suppress("FunctionName") WhileSubscribednull107 public fun WhileSubscribed( 108 stopTimeoutMillis: Long = 0, 109 replayExpirationMillis: Long = Long.MAX_VALUE 110 ): SharingStarted = 111 StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis) 112 } 113 114 /** 115 * Transforms the [subscriptionCount][MutableSharedFlow.subscriptionCount] state of the shared flow into the 116 * flow of [commands][SharingCommand] that control the sharing coroutine. See the [SharingStarted] interface 117 * documentation for details. 118 */ 119 public fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> 120 } 121 122 /** 123 * Sharing is started when the first subscriber appears, immediately stops when the last 124 * subscriber disappears (by default), keeping the replay cache forever (by default). 125 * 126 * It has the following optional parameters: 127 * 128 * * [stopTimeout] — configures a delay between the disappearance of the last 129 * subscriber and the stopping of the sharing coroutine. It defaults to zero (stop immediately). 130 * * [replayExpiration] — configures a delay between the stopping of 131 * the sharing coroutine and the resetting of the replay cache (which makes the cache empty for the [shareIn] operator 132 * and resets the cached value to the original `initialValue` for the [stateIn] operator). 133 * It defaults to [Duration.INFINITE] (keep replay cache forever, never reset buffer). 134 * Use [Duration.ZERO] value to expire the cache immediately. 135 * 136 * This function throws [IllegalArgumentException] when either [stopTimeout] or [replayExpiration] 137 * are negative. 138 */ 139 @Suppress("FunctionName") 140 @ExperimentalTime 141 public fun SharingStarted.Companion.WhileSubscribed( 142 stopTimeout: Duration = Duration.ZERO, 143 replayExpiration: Duration = Duration.INFINITE 144 ): SharingStarted = 145 StartedWhileSubscribed(stopTimeout.toLongMilliseconds(), replayExpiration.toLongMilliseconds()) 146 147 // -------------------------------- implementation -------------------------------- 148 149 private class StartedEagerly : SharingStarted { 150 override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> = 151 flowOf(SharingCommand.START) 152 override fun toString(): String = "SharingStarted.Eagerly" 153 } 154 155 private class StartedLazily : SharingStarted { <lambda>null156 override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> = flow { 157 var started = false 158 subscriptionCount.collect { count -> 159 if (count > 0 && !started) { 160 started = true 161 emit(SharingCommand.START) 162 } 163 } 164 } 165 toStringnull166 override fun toString(): String = "SharingStarted.Lazily" 167 } 168 169 private class StartedWhileSubscribed( 170 private val stopTimeout: Long, 171 private val replayExpiration: Long 172 ) : SharingStarted { 173 init { 174 require(stopTimeout >= 0) { "stopTimeout($stopTimeout ms) cannot be negative" } 175 require(replayExpiration >= 0) { "replayExpiration($replayExpiration ms) cannot be negative" } 176 } 177 178 override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> = subscriptionCount 179 .transformLatest { count -> 180 if (count > 0) { 181 emit(SharingCommand.START) 182 } else { 183 delay(stopTimeout) 184 if (replayExpiration > 0) { 185 emit(SharingCommand.STOP) 186 delay(replayExpiration) 187 } 188 emit(SharingCommand.STOP_AND_RESET_REPLAY_CACHE) 189 } 190 } 191 .dropWhile { it != SharingCommand.START } // don't emit any STOP/RESET_BUFFER to start with, only START 192 .distinctUntilChanged() // just in case somebody forgets it, don't leak our multiple sending of START 193 194 @OptIn(ExperimentalStdlibApi::class) 195 override fun toString(): String { 196 val params = buildList(2) { 197 if (stopTimeout > 0) add("stopTimeout=${stopTimeout}ms") 198 if (replayExpiration < Long.MAX_VALUE) add("replayExpiration=${replayExpiration}ms") 199 } 200 return "SharingStarted.WhileSubscribed(${params.joinToString()})" 201 } 202 203 // equals & hashcode to facilitate testing, not documented in public contract 204 override fun equals(other: Any?): Boolean = 205 other is StartedWhileSubscribed && 206 stopTimeout == other.stopTimeout && 207 replayExpiration == other.replayExpiration 208 209 override fun hashCode(): Int = stopTimeout.hashCode() * 31 + replayExpiration.hashCode() 210 } 211