• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.flow.internal
6 
7 import kotlinx.coroutines.channels.*
8 import kotlinx.coroutines.flow.*
9 import kotlinx.coroutines.internal.*
10 import kotlin.coroutines.*
11 import kotlin.jvm.*
12 
13 @JvmField
14 internal val EMPTY_RESUMES = arrayOfNulls<Continuation<Unit>?>(0)
15 
16 internal abstract class AbstractSharedFlowSlot<F> {
17     abstract fun allocateLocked(flow: F): Boolean
18     abstract fun freeLocked(flow: F): Array<Continuation<Unit>?> // returns continuations to resume after lock
19 }
20 
21 internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : SynchronizedObject() {
22     protected var slots: Array<S?>? = null // allocated when needed
23         private set
24     protected var nCollectors = 0 // number of allocated (!free) slots
25         private set
26     private var nextIndex = 0 // oracle for the next free slot index
27     private var _subscriptionCount: SubscriptionCountStateFlow? = null // init on first need
28 
29     val subscriptionCount: StateFlow<Int>
<lambda>null30         get() = synchronized(this) {
31             // allocate under lock in sync with nCollectors variable
32             _subscriptionCount ?: SubscriptionCountStateFlow(nCollectors).also {
33                 _subscriptionCount = it
34             }
35         }
36 
createSlotnull37     protected abstract fun createSlot(): S
38 
39     protected abstract fun createSlotArray(size: Int): Array<S?>
40 
41     @Suppress("UNCHECKED_CAST")
42     protected fun allocateSlot(): S {
43         // Actually create slot under lock
44         val subscriptionCount: SubscriptionCountStateFlow?
45         val slot = synchronized(this) {
46             val slots = when (val curSlots = slots) {
47                 null -> createSlotArray(2).also { slots = it }
48                 else -> if (nCollectors >= curSlots.size) {
49                     curSlots.copyOf(2 * curSlots.size).also { slots = it }
50                 } else {
51                     curSlots
52                 }
53             }
54             var index = nextIndex
55             var slot: S
56             while (true) {
57                 slot = slots[index] ?: createSlot().also { slots[index] = it }
58                 index++
59                 if (index >= slots.size) index = 0
60                 if ((slot as AbstractSharedFlowSlot<Any>).allocateLocked(this)) break // break when found and allocated free slot
61             }
62             nextIndex = index
63             nCollectors++
64             subscriptionCount = _subscriptionCount // retrieve under lock if initialized
65             slot
66         }
67         // increments subscription count
68         subscriptionCount?.increment(1)
69         return slot
70     }
71 
72     @Suppress("UNCHECKED_CAST")
freeSlotnull73     protected fun freeSlot(slot: S) {
74         // Release slot under lock
75         val subscriptionCount: SubscriptionCountStateFlow?
76         val resumes = synchronized(this) {
77             nCollectors--
78             subscriptionCount = _subscriptionCount // retrieve under lock if initialized
79             // Reset next index oracle if we have no more active collectors for more predictable behavior next time
80             if (nCollectors == 0) nextIndex = 0
81             (slot as AbstractSharedFlowSlot<Any>).freeLocked(this)
82         }
83         /*
84          * Resume suspended coroutines.
85          * This can happen when the subscriber that was freed was a slow one and was holding up buffer.
86          * When this subscriber was freed, previously queued emitted can now wake up and are resumed here.
87          */
88         for (cont in resumes) cont?.resume(Unit)
89         // decrement subscription count
90         subscriptionCount?.increment(-1)
91     }
92 
forEachSlotLockednull93     protected inline fun forEachSlotLocked(block: (S) -> Unit) {
94         if (nCollectors == 0) return
95         slots?.forEach { slot ->
96             if (slot != null) block(slot)
97         }
98     }
99 }
100 
101 /**
102  * [StateFlow] that represents the number of subscriptions.
103  *
104  * It is exposed as a regular [StateFlow] in our public API, but it is implemented as [SharedFlow] undercover to
105  * avoid conflations of consecutive updates because the subscription count is very sensitive to it.
106  *
107  * The importance of non-conflating can be demonstrated with the following example:
108  * ```
109  * val shared = flowOf(239).stateIn(this, SharingStarted.Lazily, 42) // stateIn for the sake of the initial value
110  * println(shared.first())
111  * yield()
112  * println(shared.first())
113  * ```
114  * If the flow is shared within the same dispatcher (e.g. Main) or with a slow/throttled one,
115  * the `SharingStarted.Lazily` will never be able to start the source: `first` sees the initial value and immediately
116  * unsubscribes, leaving the asynchronous `SharingStarted` with conflated zero.
117  *
118  * To avoid that (especially in a more complex scenarios), we do not conflate subscription updates.
119  */
120 private class SubscriptionCountStateFlow(initialValue: Int) : StateFlow<Int>,
121     SharedFlowImpl<Int>(1, Int.MAX_VALUE, BufferOverflow.DROP_OLDEST)
122 {
123     init { tryEmit(initialValue) }
124 
125     override val value: Int
<lambda>null126         get() = synchronized(this) { lastReplayedLocked }
127 
<lambda>null128     fun increment(delta: Int) = synchronized(this) {
129         tryEmit(lastReplayedLocked + delta)
130     }
131 }
132