1 /* <lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.flow.internal 6 7 import kotlinx.coroutines.channels.* 8 import kotlinx.coroutines.flow.* 9 import kotlinx.coroutines.internal.* 10 import kotlin.coroutines.* 11 import kotlin.jvm.* 12 13 @JvmField 14 internal val EMPTY_RESUMES = arrayOfNulls<Continuation<Unit>?>(0) 15 16 internal abstract class AbstractSharedFlowSlot<F> { 17 abstract fun allocateLocked(flow: F): Boolean 18 abstract fun freeLocked(flow: F): Array<Continuation<Unit>?> // returns continuations to resume after lock 19 } 20 21 internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : SynchronizedObject() { 22 protected var slots: Array<S?>? = null // allocated when needed 23 private set 24 protected var nCollectors = 0 // number of allocated (!free) slots 25 private set 26 private var nextIndex = 0 // oracle for the next free slot index 27 private var _subscriptionCount: SubscriptionCountStateFlow? = null // init on first need 28 29 val subscriptionCount: StateFlow<Int> <lambda>null30 get() = synchronized(this) { 31 // allocate under lock in sync with nCollectors variable 32 _subscriptionCount ?: SubscriptionCountStateFlow(nCollectors).also { 33 _subscriptionCount = it 34 } 35 } 36 createSlotnull37 protected abstract fun createSlot(): S 38 39 protected abstract fun createSlotArray(size: Int): Array<S?> 40 41 @Suppress("UNCHECKED_CAST") 42 protected fun allocateSlot(): S { 43 // Actually create slot under lock 44 val subscriptionCount: SubscriptionCountStateFlow? 45 val slot = synchronized(this) { 46 val slots = when (val curSlots = slots) { 47 null -> createSlotArray(2).also { slots = it } 48 else -> if (nCollectors >= curSlots.size) { 49 curSlots.copyOf(2 * curSlots.size).also { slots = it } 50 } else { 51 curSlots 52 } 53 } 54 var index = nextIndex 55 var slot: S 56 while (true) { 57 slot = slots[index] ?: createSlot().also { slots[index] = it } 58 index++ 59 if (index >= slots.size) index = 0 60 if ((slot as AbstractSharedFlowSlot<Any>).allocateLocked(this)) break // break when found and allocated free slot 61 } 62 nextIndex = index 63 nCollectors++ 64 subscriptionCount = _subscriptionCount // retrieve under lock if initialized 65 slot 66 } 67 // increments subscription count 68 subscriptionCount?.increment(1) 69 return slot 70 } 71 72 @Suppress("UNCHECKED_CAST") freeSlotnull73 protected fun freeSlot(slot: S) { 74 // Release slot under lock 75 val subscriptionCount: SubscriptionCountStateFlow? 76 val resumes = synchronized(this) { 77 nCollectors-- 78 subscriptionCount = _subscriptionCount // retrieve under lock if initialized 79 // Reset next index oracle if we have no more active collectors for more predictable behavior next time 80 if (nCollectors == 0) nextIndex = 0 81 (slot as AbstractSharedFlowSlot<Any>).freeLocked(this) 82 } 83 /* 84 * Resume suspended coroutines. 85 * This can happen when the subscriber that was freed was a slow one and was holding up buffer. 86 * When this subscriber was freed, previously queued emitted can now wake up and are resumed here. 87 */ 88 for (cont in resumes) cont?.resume(Unit) 89 // decrement subscription count 90 subscriptionCount?.increment(-1) 91 } 92 forEachSlotLockednull93 protected inline fun forEachSlotLocked(block: (S) -> Unit) { 94 if (nCollectors == 0) return 95 slots?.forEach { slot -> 96 if (slot != null) block(slot) 97 } 98 } 99 } 100 101 /** 102 * [StateFlow] that represents the number of subscriptions. 103 * 104 * It is exposed as a regular [StateFlow] in our public API, but it is implemented as [SharedFlow] undercover to 105 * avoid conflations of consecutive updates because the subscription count is very sensitive to it. 106 * 107 * The importance of non-conflating can be demonstrated with the following example: 108 * ``` 109 * val shared = flowOf(239).stateIn(this, SharingStarted.Lazily, 42) // stateIn for the sake of the initial value 110 * println(shared.first()) 111 * yield() 112 * println(shared.first()) 113 * ``` 114 * If the flow is shared within the same dispatcher (e.g. Main) or with a slow/throttled one, 115 * the `SharingStarted.Lazily` will never be able to start the source: `first` sees the initial value and immediately 116 * unsubscribes, leaving the asynchronous `SharingStarted` with conflated zero. 117 * 118 * To avoid that (especially in a more complex scenarios), we do not conflate subscription updates. 119 */ 120 private class SubscriptionCountStateFlow(initialValue: Int) : StateFlow<Int>, 121 SharedFlowImpl<Int>(1, Int.MAX_VALUE, BufferOverflow.DROP_OLDEST) 122 { 123 init { tryEmit(initialValue) } 124 125 override val value: Int <lambda>null126 get() = synchronized(this) { lastReplayedLocked } 127 <lambda>null128 fun increment(delta: Int) = synchronized(this) { 129 tryEmit(lastReplayedLocked + delta) 130 } 131 } 132