1 /* 2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.internal 6 7 import kotlinx.atomicfu.* 8 import kotlinx.coroutines.* 9 10 /** 11 * @suppress **This an internal API and should not be used from general code.** 12 */ 13 @InternalCoroutinesApi 14 public interface ThreadSafeHeapNode { 15 public var heap: ThreadSafeHeap<*>? 16 public var index: Int 17 } 18 19 /** 20 * Synchronized binary heap. 21 * @suppress **This an internal API and should not be used from general code.** 22 */ 23 @InternalCoroutinesApi 24 public open class ThreadSafeHeap<T> : SynchronizedObject() where T: ThreadSafeHeapNode, T: Comparable<T> { 25 private var a: Array<T?>? = null 26 27 private val _size = atomic(0) 28 29 public var size: Int 30 get() = _size.value 31 private set(value) { _size.value = value } 32 33 public val isEmpty: Boolean get() = size == 0 34 <lambda>null35 public fun clear(): Unit = synchronized(this) { 36 a?.fill(null) 37 _size.value = 0 38 } 39 findnull40 public fun find( 41 predicate: (value: T) -> Boolean 42 ): T? = synchronized(this) block@{ 43 for (i in 0 until size) { 44 val value = a?.get(i)!! 45 if (predicate(value)) return@block value 46 } 47 null 48 } 49 <lambda>null50 public fun peek(): T? = synchronized(this) { firstImpl() } 51 <lambda>null52 public fun removeFirstOrNull(): T? = synchronized(this) { 53 if (size > 0) { 54 removeAtImpl(0) 55 } else { 56 null 57 } 58 } 59 <lambda>null60 public inline fun removeFirstIf(predicate: (T) -> Boolean): T? = synchronized(this) { 61 val first = firstImpl() ?: return null 62 if (predicate(first)) { 63 removeAtImpl(0) 64 } else { 65 null 66 } 67 } 68 <lambda>null69 public fun addLast(node: T): Unit = synchronized(this) { addImpl(node) } 70 71 // Condition also receives current first node in the heap <lambda>null72 public inline fun addLastIf(node: T, cond: (T?) -> Boolean): Boolean = synchronized(this) { 73 if (cond(firstImpl())) { 74 addImpl(node) 75 true 76 } else { 77 false 78 } 79 } 80 <lambda>null81 public fun remove(node: T): Boolean = synchronized(this) { 82 return if (node.heap == null) { 83 false 84 } else { 85 val index = node.index 86 assert { index >= 0 } 87 removeAtImpl(index) 88 true 89 } 90 } 91 92 @PublishedApi firstImplnull93 internal fun firstImpl(): T? = a?.get(0) 94 95 @PublishedApi 96 internal fun removeAtImpl(index: Int): T { 97 assert { size > 0 } 98 val a = this.a!! 99 size-- 100 if (index < size) { 101 swap(index, size) 102 val j = (index - 1) / 2 103 if (index > 0 && a[index]!! < a[j]!!) { 104 swap(index, j) 105 siftUpFrom(j) 106 } else { 107 siftDownFrom(index) 108 } 109 } 110 val result = a[size]!! 111 assert { result.heap === this } 112 result.heap = null 113 result.index = -1 114 a[size] = null 115 return result 116 } 117 118 @PublishedApi addImplnull119 internal fun addImpl(node: T) { 120 assert { node.heap == null } 121 node.heap = this 122 val a = realloc() 123 val i = size++ 124 a[i] = node 125 node.index = i 126 siftUpFrom(i) 127 } 128 siftUpFromnull129 private tailrec fun siftUpFrom(i: Int) { 130 if (i <= 0) return 131 val a = a!! 132 val j = (i - 1) / 2 133 if (a[j]!! <= a[i]!!) return 134 swap(i, j) 135 siftUpFrom(j) 136 } 137 siftDownFromnull138 private tailrec fun siftDownFrom(i: Int) { 139 var j = 2 * i + 1 140 if (j >= size) return 141 val a = a!! 142 if (j + 1 < size && a[j + 1]!! < a[j]!!) j++ 143 if (a[i]!! <= a[j]!!) return 144 swap(i, j) 145 siftDownFrom(j) 146 } 147 148 @Suppress("UNCHECKED_CAST") reallocnull149 private fun realloc(): Array<T?> { 150 val a = this.a 151 return when { 152 a == null -> (arrayOfNulls<ThreadSafeHeapNode>(4) as Array<T?>).also { this.a = it } 153 size >= a.size -> a.copyOf(size * 2).also { this.a = it } 154 else -> a 155 } 156 } 157 swapnull158 private fun swap(i: Int, j: Int) { 159 val a = a!! 160 val ni = a[j]!! 161 val nj = a[i]!! 162 a[i] = ni 163 a[j] = nj 164 ni.index = i 165 nj.index = j 166 } 167 } 168