• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.flow.internal
6 
7 import kotlinx.coroutines.flow.*
8 import kotlinx.coroutines.internal.*
9 import kotlin.coroutines.*
10 import kotlin.jvm.*
11 import kotlin.native.concurrent.*
12 
13 @JvmField
14 @SharedImmutable
15 internal val EMPTY_RESUMES = arrayOfNulls<Continuation<Unit>?>(0)
16 
17 internal abstract class AbstractSharedFlowSlot<F> {
18     abstract fun allocateLocked(flow: F): Boolean
19     abstract fun freeLocked(flow: F): Array<Continuation<Unit>?> // returns continuations to resume after lock
20 }
21 
22 internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : SynchronizedObject() {
23     @Suppress("UNCHECKED_CAST")
24     protected var slots: Array<S?>? = null // allocated when needed
25         private set
26     protected var nCollectors = 0 // number of allocated (!free) slots
27         private set
28     private var nextIndex = 0 // oracle for the next free slot index
29     private var _subscriptionCount: MutableStateFlow<Int>? = null // init on first need
30 
31     val subscriptionCount: StateFlow<Int>
<lambda>null32         get() = synchronized(this) {
33             // allocate under lock in sync with nCollectors variable
34             _subscriptionCount ?: MutableStateFlow(nCollectors).also {
35                 _subscriptionCount = it
36             }
37         }
38 
createSlotnull39     protected abstract fun createSlot(): S
40 
41     protected abstract fun createSlotArray(size: Int): Array<S?>
42 
43     @Suppress("UNCHECKED_CAST")
44     protected fun allocateSlot(): S {
45         // Actually create slot under lock
46         var subscriptionCount: MutableStateFlow<Int>? = null
47         val slot = synchronized(this) {
48             val slots = when (val curSlots = slots) {
49                 null -> createSlotArray(2).also { slots = it }
50                 else -> if (nCollectors >= curSlots.size) {
51                     curSlots.copyOf(2 * curSlots.size).also { slots = it }
52                 } else {
53                     curSlots
54                 }
55             }
56             var index = nextIndex
57             var slot: S
58             while (true) {
59                 slot = slots[index] ?: createSlot().also { slots[index] = it }
60                 index++
61                 if (index >= slots.size) index = 0
62                 if ((slot as AbstractSharedFlowSlot<Any>).allocateLocked(this)) break // break when found and allocated free slot
63             }
64             nextIndex = index
65             nCollectors++
66             subscriptionCount = _subscriptionCount // retrieve under lock if initialized
67             slot
68         }
69         // increments subscription count
70         subscriptionCount?.increment(1)
71         return slot
72     }
73 
74     @Suppress("UNCHECKED_CAST")
freeSlotnull75     protected fun freeSlot(slot: S) {
76         // Release slot under lock
77         var subscriptionCount: MutableStateFlow<Int>? = null
78         val resumes = synchronized(this) {
79             nCollectors--
80             subscriptionCount = _subscriptionCount // retrieve under lock if initialized
81             // Reset next index oracle if we have no more active collectors for more predictable behavior next time
82             if (nCollectors == 0) nextIndex = 0
83             (slot as AbstractSharedFlowSlot<Any>).freeLocked(this)
84         }
85         /*
86            Resume suspended coroutines.
87            This can happens when the subscriber that was freed was a slow one and was holding up buffer.
88            When this subscriber was freed, previously queued emitted can now wake up and are resumed here.
89         */
90         for (cont in resumes) cont?.resume(Unit)
91         // decrement subscription count
92         subscriptionCount?.increment(-1)
93     }
94 
forEachSlotLockednull95     protected inline fun forEachSlotLocked(block: (S) -> Unit) {
96         if (nCollectors == 0) return
97         slots?.forEach { slot ->
98             if (slot != null) block(slot)
99         }
100     }
101 }