• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.flow.internal
2 
3 import kotlinx.coroutines.*
4 import kotlinx.coroutines.channels.*
5 import kotlinx.coroutines.flow.*
6 import kotlinx.coroutines.internal.*
7 import kotlin.coroutines.*
8 import kotlin.jvm.*
9 
10 @JvmField
11 internal val EMPTY_RESUMES = arrayOfNulls<Continuation<Unit>?>(0)
12 
13 internal abstract class AbstractSharedFlowSlot<F> {
14     abstract fun allocateLocked(flow: F): Boolean
15     abstract fun freeLocked(flow: F): Array<Continuation<Unit>?> // returns continuations to resume after lock
16 }
17 
18 internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : SynchronizedObject() {
19     protected var slots: Array<S?>? = null // allocated when needed
20         private set
21     protected var nCollectors = 0 // number of allocated (!free) slots
22         private set
23     private var nextIndex = 0 // oracle for the next free slot index
24     private var _subscriptionCount: SubscriptionCountStateFlow? = null // init on first need
25 
26     val subscriptionCount: StateFlow<Int>
<lambda>null27         get() = synchronized(this) {
28             // allocate under lock in sync with nCollectors variable
29             _subscriptionCount ?: SubscriptionCountStateFlow(nCollectors).also {
30                 _subscriptionCount = it
31             }
32         }
33 
createSlotnull34     protected abstract fun createSlot(): S
35 
36     protected abstract fun createSlotArray(size: Int): Array<S?>
37 
38     @Suppress("UNCHECKED_CAST")
39     protected fun allocateSlot(): S {
40         // Actually create slot under lock
41         val subscriptionCount: SubscriptionCountStateFlow?
42         val slot = synchronized(this) {
43             val slots = when (val curSlots = slots) {
44                 null -> createSlotArray(2).also { slots = it }
45                 else -> if (nCollectors >= curSlots.size) {
46                     curSlots.copyOf(2 * curSlots.size).also { slots = it }
47                 } else {
48                     curSlots
49                 }
50             }
51             var index = nextIndex
52             var slot: S
53             while (true) {
54                 slot = slots[index] ?: createSlot().also { slots[index] = it }
55                 index++
56                 if (index >= slots.size) index = 0
57                 if ((slot as AbstractSharedFlowSlot<Any>).allocateLocked(this)) break // break when found and allocated free slot
58             }
59             nextIndex = index
60             nCollectors++
61             subscriptionCount = _subscriptionCount // retrieve under lock if initialized
62             slot
63         }
64         // increments subscription count
65         subscriptionCount?.increment(1)
66         return slot
67     }
68 
69     @Suppress("UNCHECKED_CAST")
freeSlotnull70     protected fun freeSlot(slot: S) {
71         // Release slot under lock
72         val subscriptionCount: SubscriptionCountStateFlow?
73         val resumes = synchronized(this) {
74             nCollectors--
75             subscriptionCount = _subscriptionCount // retrieve under lock if initialized
76             // Reset next index oracle if we have no more active collectors for more predictable behavior next time
77             if (nCollectors == 0) nextIndex = 0
78             (slot as AbstractSharedFlowSlot<Any>).freeLocked(this)
79         }
80         /*
81          * Resume suspended coroutines.
82          * This can happen when the subscriber that was freed was a slow one and was holding up buffer.
83          * When this subscriber was freed, previously queued emitted can now wake up and are resumed here.
84          */
85         for (cont in resumes) cont?.resume(Unit)
86         // decrement subscription count
87         subscriptionCount?.increment(-1)
88     }
89 
forEachSlotLockednull90     protected inline fun forEachSlotLocked(block: (S) -> Unit) {
91         if (nCollectors == 0) return
92         slots?.forEach { slot ->
93             if (slot != null) block(slot)
94         }
95     }
96 }
97 
98 /**
99  * [StateFlow] that represents the number of subscriptions.
100  *
101  * It is exposed as a regular [StateFlow] in our public API, but it is implemented as [SharedFlow] undercover to
102  * avoid conflations of consecutive updates because the subscription count is very sensitive to it.
103  *
104  * The importance of non-conflating can be demonstrated with the following example:
105  * ```
106  * val shared = flowOf(239).stateIn(this, SharingStarted.Lazily, 42) // stateIn for the sake of the initial value
107  * println(shared.first())
108  * yield()
109  * println(shared.first())
110  * ```
111  * If the flow is shared within the same dispatcher (e.g. Main) or with a slow/throttled one,
112  * the `SharingStarted.Lazily` will never be able to start the source: `first` sees the initial value and immediately
113  * unsubscribes, leaving the asynchronous `SharingStarted` with conflated zero.
114  *
115  * To avoid that (especially in a more complex scenarios), we do not conflate subscription updates.
116  */
117 @OptIn(ExperimentalForInheritanceCoroutinesApi::class)
118 private class SubscriptionCountStateFlow(initialValue: Int) : StateFlow<Int>,
119     SharedFlowImpl<Int>(1, Int.MAX_VALUE, BufferOverflow.DROP_OLDEST)
120 {
121     init { tryEmit(initialValue) }
122 
123     override val value: Int
<lambda>null124         get() = synchronized(this) { lastReplayedLocked }
125 
<lambda>null126     fun increment(delta: Int) = synchronized(this) {
127         tryEmit(lastReplayedLocked + delta)
128     }
129 }
130