• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.internal
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 
10 /**
11  * @suppress **This an internal API and should not be used from general code.**
12  */
13 @InternalCoroutinesApi
14 public interface ThreadSafeHeapNode {
15     public var heap: ThreadSafeHeap<*>?
16     public var index: Int
17 }
18 
19 /**
20  * Synchronized binary heap.
21  * @suppress **This an internal API and should not be used from general code.**
22  */
23 @InternalCoroutinesApi
24 public open class ThreadSafeHeap<T> : SynchronizedObject() where T: ThreadSafeHeapNode, T: Comparable<T> {
25     private var a: Array<T?>? = null
26 
27     private val _size = atomic(0)
28 
29     public var size: Int
30         get() = _size.value
31         private set(value) { _size.value = value }
32 
33     public val isEmpty: Boolean get() = size == 0
34 
<lambda>null35     public fun clear(): Unit = synchronized(this) {
36         a?.fill(null)
37         _size.value = 0
38     }
39 
<lambda>null40     public fun peek(): T? = synchronized(this) { firstImpl() }
41 
<lambda>null42     public fun removeFirstOrNull(): T? = synchronized(this) {
43         if (size > 0) {
44             removeAtImpl(0)
45         } else {
46             null
47         }
48     }
49 
50     // @Synchronized // NOTE! NOTE! NOTE! inline fun cannot be @Synchronized
<lambda>null51     public inline fun removeFirstIf(predicate: (T) -> Boolean): T? = synchronized(this) {
52         val first = firstImpl() ?: return null
53         if (predicate(first)) {
54             removeAtImpl(0)
55         } else {
56             null
57         }
58     }
59 
<lambda>null60     public fun addLast(node: T): Unit = synchronized(this) { addImpl(node) }
61 
62     // @Synchronized // NOTE! NOTE! NOTE! inline fun cannot be @Synchronized
63     // Condition also receives current first node in the heap
<lambda>null64     public inline fun addLastIf(node: T, cond: (T?) -> Boolean): Boolean = synchronized(this) {
65         if (cond(firstImpl())) {
66             addImpl(node)
67             true
68         } else {
69             false
70         }
71     }
72 
<lambda>null73     public fun remove(node: T): Boolean = synchronized(this) {
74         return if (node.heap == null) {
75             false
76         } else {
77             val index = node.index
78             assert { index >= 0 }
79             removeAtImpl(index)
80             true
81         }
82     }
83 
84     @PublishedApi
firstImplnull85     internal fun firstImpl(): T? = a?.get(0)
86 
87     @PublishedApi
88     internal fun removeAtImpl(index: Int): T {
89         assert { size > 0 }
90         val a = this.a!!
91         size--
92         if (index < size) {
93             swap(index, size)
94             val j = (index - 1) / 2
95             if (index > 0 && a[index]!! < a[j]!!) {
96                 swap(index, j)
97                 siftUpFrom(j)
98             } else {
99                 siftDownFrom(index)
100             }
101         }
102         val result = a[size]!!
103         assert { result.heap === this }
104         result.heap = null
105         result.index = -1
106         a[size] = null
107         return result
108     }
109 
110     @PublishedApi
addImplnull111     internal fun addImpl(node: T) {
112         assert { node.heap == null }
113         node.heap = this
114         val a = realloc()
115         val i = size++
116         a[i] = node
117         node.index = i
118         siftUpFrom(i)
119     }
120 
siftUpFromnull121     private tailrec fun siftUpFrom(i: Int) {
122         if (i <= 0) return
123         val a = a!!
124         val j = (i - 1) / 2
125         if (a[j]!! <= a[i]!!) return
126         swap(i, j)
127         siftUpFrom(j)
128     }
129 
siftDownFromnull130     private tailrec fun siftDownFrom(i: Int) {
131         var j = 2 * i + 1
132         if (j >= size) return
133         val a = a!!
134         if (j + 1 < size && a[j + 1]!! < a[j]!!) j++
135         if (a[i]!! <= a[j]!!) return
136         swap(i, j)
137         siftDownFrom(j)
138     }
139 
140     @Suppress("UNCHECKED_CAST")
reallocnull141     private fun realloc(): Array<T?> {
142         val a = this.a
143         return when {
144             a == null -> (arrayOfNulls<ThreadSafeHeapNode>(4) as Array<T?>).also { this.a = it }
145             size >= a.size -> a.copyOf(size * 2).also { this.a = it }
146             else -> a
147         }
148     }
149 
swapnull150     private fun swap(i: Int, j: Int) {
151         val a = a!!
152         val ni = a[j]!!
153         val nj = a[i]!!
154         a[i] = ni
155         a[j] = nj
156         ni.index = i
157         nj.index = j
158     }
159 }
160