• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.internal
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 
10 /**
11  * @suppress **This an internal API and should not be used from general code.**
12  */
13 @InternalCoroutinesApi
14 public interface ThreadSafeHeapNode {
15     public var heap: ThreadSafeHeap<*>?
16     public var index: Int
17 }
18 
19 /**
20  * Synchronized binary heap.
21  * @suppress **This an internal API and should not be used from general code.**
22  */
23 @InternalCoroutinesApi
24 public open class ThreadSafeHeap<T> : SynchronizedObject() where T: ThreadSafeHeapNode, T: Comparable<T> {
25     private var a: Array<T?>? = null
26 
27     private val _size = atomic(0)
28 
29     public var size: Int
30         get() = _size.value
31         private set(value) { _size.value = value }
32 
33     public val isEmpty: Boolean get() = size == 0
34 
<lambda>null35     public fun clear(): Unit = synchronized(this) {
36         a?.fill(null)
37         _size.value = 0
38     }
39 
findnull40     public fun find(
41         predicate: (value: T) -> Boolean
42     ): T? = synchronized(this) block@{
43         for (i in 0 until size) {
44             val value = a?.get(i)!!
45             if (predicate(value)) return@block value
46         }
47         null
48     }
49 
<lambda>null50     public fun peek(): T? = synchronized(this) { firstImpl() }
51 
<lambda>null52     public fun removeFirstOrNull(): T? = synchronized(this) {
53         if (size > 0) {
54             removeAtImpl(0)
55         } else {
56             null
57         }
58     }
59 
<lambda>null60     public inline fun removeFirstIf(predicate: (T) -> Boolean): T? = synchronized(this) {
61         val first = firstImpl() ?: return null
62         if (predicate(first)) {
63             removeAtImpl(0)
64         } else {
65             null
66         }
67     }
68 
<lambda>null69     public fun addLast(node: T): Unit = synchronized(this) { addImpl(node) }
70 
71     // Condition also receives current first node in the heap
<lambda>null72     public inline fun addLastIf(node: T, cond: (T?) -> Boolean): Boolean = synchronized(this) {
73         if (cond(firstImpl())) {
74             addImpl(node)
75             true
76         } else {
77             false
78         }
79     }
80 
<lambda>null81     public fun remove(node: T): Boolean = synchronized(this) {
82         return if (node.heap == null) {
83             false
84         } else {
85             val index = node.index
86             assert { index >= 0 }
87             removeAtImpl(index)
88             true
89         }
90     }
91 
92     @PublishedApi
firstImplnull93     internal fun firstImpl(): T? = a?.get(0)
94 
95     @PublishedApi
96     internal fun removeAtImpl(index: Int): T {
97         assert { size > 0 }
98         val a = this.a!!
99         size--
100         if (index < size) {
101             swap(index, size)
102             val j = (index - 1) / 2
103             if (index > 0 && a[index]!! < a[j]!!) {
104                 swap(index, j)
105                 siftUpFrom(j)
106             } else {
107                 siftDownFrom(index)
108             }
109         }
110         val result = a[size]!!
111         assert { result.heap === this }
112         result.heap = null
113         result.index = -1
114         a[size] = null
115         return result
116     }
117 
118     @PublishedApi
addImplnull119     internal fun addImpl(node: T) {
120         assert { node.heap == null }
121         node.heap = this
122         val a = realloc()
123         val i = size++
124         a[i] = node
125         node.index = i
126         siftUpFrom(i)
127     }
128 
siftUpFromnull129     private tailrec fun siftUpFrom(i: Int) {
130         if (i <= 0) return
131         val a = a!!
132         val j = (i - 1) / 2
133         if (a[j]!! <= a[i]!!) return
134         swap(i, j)
135         siftUpFrom(j)
136     }
137 
siftDownFromnull138     private tailrec fun siftDownFrom(i: Int) {
139         var j = 2 * i + 1
140         if (j >= size) return
141         val a = a!!
142         if (j + 1 < size && a[j + 1]!! < a[j]!!) j++
143         if (a[i]!! <= a[j]!!) return
144         swap(i, j)
145         siftDownFrom(j)
146     }
147 
148     @Suppress("UNCHECKED_CAST")
reallocnull149     private fun realloc(): Array<T?> {
150         val a = this.a
151         return when {
152             a == null -> (arrayOfNulls<ThreadSafeHeapNode>(4) as Array<T?>).also { this.a = it }
153             size >= a.size -> a.copyOf(size * 2).also { this.a = it }
154             else -> a
155         }
156     }
157 
swapnull158     private fun swap(i: Int, j: Int) {
159         val a = a!!
160         val ni = a[j]!!
161         val nj = a[i]!!
162         a[i] = ni
163         a[j] = nj
164         ni.index = i
165         nj.index = j
166     }
167 }
168