1 /* 2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.internal 6 7 import kotlinx.atomicfu.* 8 import kotlinx.coroutines.* 9 10 /** 11 * @suppress **This an internal API and should not be used from general code.** 12 */ 13 @InternalCoroutinesApi 14 public interface ThreadSafeHeapNode { 15 public var heap: ThreadSafeHeap<*>? 16 public var index: Int 17 } 18 19 /** 20 * Synchronized binary heap. 21 * @suppress **This an internal API and should not be used from general code.** 22 */ 23 @InternalCoroutinesApi 24 public open class ThreadSafeHeap<T> : SynchronizedObject() where T: ThreadSafeHeapNode, T: Comparable<T> { 25 private var a: Array<T?>? = null 26 27 private val _size = atomic(0) 28 29 public var size: Int 30 get() = _size.value 31 private set(value) { _size.value = value } 32 33 public val isEmpty: Boolean get() = size == 0 34 <lambda>null35 public fun clear(): Unit = synchronized(this) { 36 a?.fill(null) 37 _size.value = 0 38 } 39 <lambda>null40 public fun peek(): T? = synchronized(this) { firstImpl() } 41 <lambda>null42 public fun removeFirstOrNull(): T? = synchronized(this) { 43 if (size > 0) { 44 removeAtImpl(0) 45 } else { 46 null 47 } 48 } 49 50 // @Synchronized // NOTE! NOTE! NOTE! inline fun cannot be @Synchronized <lambda>null51 public inline fun removeFirstIf(predicate: (T) -> Boolean): T? = synchronized(this) { 52 val first = firstImpl() ?: return null 53 if (predicate(first)) { 54 removeAtImpl(0) 55 } else { 56 null 57 } 58 } 59 <lambda>null60 public fun addLast(node: T): Unit = synchronized(this) { addImpl(node) } 61 62 // @Synchronized // NOTE! NOTE! NOTE! inline fun cannot be @Synchronized 63 // Condition also receives current first node in the heap <lambda>null64 public inline fun addLastIf(node: T, cond: (T?) -> Boolean): Boolean = synchronized(this) { 65 if (cond(firstImpl())) { 66 addImpl(node) 67 true 68 } else { 69 false 70 } 71 } 72 <lambda>null73 public fun remove(node: T): Boolean = synchronized(this) { 74 return if (node.heap == null) { 75 false 76 } else { 77 val index = node.index 78 assert { index >= 0 } 79 removeAtImpl(index) 80 true 81 } 82 } 83 84 @PublishedApi firstImplnull85 internal fun firstImpl(): T? = a?.get(0) 86 87 @PublishedApi 88 internal fun removeAtImpl(index: Int): T { 89 assert { size > 0 } 90 val a = this.a!! 91 size-- 92 if (index < size) { 93 swap(index, size) 94 val j = (index - 1) / 2 95 if (index > 0 && a[index]!! < a[j]!!) { 96 swap(index, j) 97 siftUpFrom(j) 98 } else { 99 siftDownFrom(index) 100 } 101 } 102 val result = a[size]!! 103 assert { result.heap === this } 104 result.heap = null 105 result.index = -1 106 a[size] = null 107 return result 108 } 109 110 @PublishedApi addImplnull111 internal fun addImpl(node: T) { 112 assert { node.heap == null } 113 node.heap = this 114 val a = realloc() 115 val i = size++ 116 a[i] = node 117 node.index = i 118 siftUpFrom(i) 119 } 120 siftUpFromnull121 private tailrec fun siftUpFrom(i: Int) { 122 if (i <= 0) return 123 val a = a!! 124 val j = (i - 1) / 2 125 if (a[j]!! <= a[i]!!) return 126 swap(i, j) 127 siftUpFrom(j) 128 } 129 siftDownFromnull130 private tailrec fun siftDownFrom(i: Int) { 131 var j = 2 * i + 1 132 if (j >= size) return 133 val a = a!! 134 if (j + 1 < size && a[j + 1]!! < a[j]!!) j++ 135 if (a[i]!! <= a[j]!!) return 136 swap(i, j) 137 siftDownFrom(j) 138 } 139 140 @Suppress("UNCHECKED_CAST") reallocnull141 private fun realloc(): Array<T?> { 142 val a = this.a 143 return when { 144 a == null -> (arrayOfNulls<ThreadSafeHeapNode>(4) as Array<T?>).also { this.a = it } 145 size >= a.size -> a.copyOf(size * 2).also { this.a = it } 146 else -> a 147 } 148 } 149 swapnull150 private fun swap(i: Int, j: Int) { 151 val a = a!! 152 val ni = a[j]!! 153 val nj = a[i]!! 154 a[i] = ni 155 a[j] = nj 156 ni.index = i 157 nj.index = j 158 } 159 } 160