1 /* <lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.flow.internal 6 7 import kotlinx.coroutines.flow.* 8 import kotlinx.coroutines.internal.* 9 import kotlin.coroutines.* 10 import kotlin.jvm.* 11 import kotlin.native.concurrent.* 12 13 @JvmField 14 @SharedImmutable 15 internal val EMPTY_RESUMES = arrayOfNulls<Continuation<Unit>?>(0) 16 17 internal abstract class AbstractSharedFlowSlot<F> { 18 abstract fun allocateLocked(flow: F): Boolean 19 abstract fun freeLocked(flow: F): Array<Continuation<Unit>?> // returns continuations to resume after lock 20 } 21 22 internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : SynchronizedObject() { 23 @Suppress("UNCHECKED_CAST") 24 protected var slots: Array<S?>? = null // allocated when needed 25 private set 26 protected var nCollectors = 0 // number of allocated (!free) slots 27 private set 28 private var nextIndex = 0 // oracle for the next free slot index 29 private var _subscriptionCount: MutableStateFlow<Int>? = null // init on first need 30 31 val subscriptionCount: StateFlow<Int> <lambda>null32 get() = synchronized(this) { 33 // allocate under lock in sync with nCollectors variable 34 _subscriptionCount ?: MutableStateFlow(nCollectors).also { 35 _subscriptionCount = it 36 } 37 } 38 createSlotnull39 protected abstract fun createSlot(): S 40 41 protected abstract fun createSlotArray(size: Int): Array<S?> 42 43 @Suppress("UNCHECKED_CAST") 44 protected fun allocateSlot(): S { 45 // Actually create slot under lock 46 var subscriptionCount: MutableStateFlow<Int>? = null 47 val slot = synchronized(this) { 48 val slots = when (val curSlots = slots) { 49 null -> createSlotArray(2).also { slots = it } 50 else -> if (nCollectors >= curSlots.size) { 51 curSlots.copyOf(2 * curSlots.size).also { slots = it } 52 } else { 53 curSlots 54 } 55 } 56 var index = nextIndex 57 var slot: S 58 while (true) { 59 slot = slots[index] ?: createSlot().also { slots[index] = it } 60 index++ 61 if (index >= slots.size) index = 0 62 if ((slot as AbstractSharedFlowSlot<Any>).allocateLocked(this)) break // break when found and allocated free slot 63 } 64 nextIndex = index 65 nCollectors++ 66 subscriptionCount = _subscriptionCount // retrieve under lock if initialized 67 slot 68 } 69 // increments subscription count 70 subscriptionCount?.increment(1) 71 return slot 72 } 73 74 @Suppress("UNCHECKED_CAST") freeSlotnull75 protected fun freeSlot(slot: S) { 76 // Release slot under lock 77 var subscriptionCount: MutableStateFlow<Int>? = null 78 val resumes = synchronized(this) { 79 nCollectors-- 80 subscriptionCount = _subscriptionCount // retrieve under lock if initialized 81 // Reset next index oracle if we have no more active collectors for more predictable behavior next time 82 if (nCollectors == 0) nextIndex = 0 83 (slot as AbstractSharedFlowSlot<Any>).freeLocked(this) 84 } 85 /* 86 Resume suspended coroutines. 87 This can happens when the subscriber that was freed was a slow one and was holding up buffer. 88 When this subscriber was freed, previously queued emitted can now wake up and are resumed here. 89 */ 90 for (cont in resumes) cont?.resume(Unit) 91 // decrement subscription count 92 subscriptionCount?.increment(-1) 93 } 94 forEachSlotLockednull95 protected inline fun forEachSlotLocked(block: (S) -> Unit) { 96 if (nCollectors == 0) return 97 slots?.forEach { slot -> 98 if (slot != null) block(slot) 99 } 100 } 101 }