1/** 2 * Copyright (c) 2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16package std.containers; 17 18/** 19 * BlockingQueue implementation 20 */ 21export final class BlockingQueue<T> { 22 23 constructor() { 24 // Array, where all BlockingQueue elements will be located 25 this.array = new Array<T>(BlockingQueue.initSize); 26 this.mutex = BlockingQueue.initMutex(); 27 this.event = BlockingQueue.initEvent(); 28 } 29 30 // Inserts element at the end. 31 push(el: T) { 32 BlockingQueue.lock(this.mutex); 33 if (this.curTailIdx == this.array.length) { 34 let curQueueSize: int = this.curTailIdx - this.curHeadIdx; 35 let newArr = new Array<T>((this.array.length * 2) as int); 36 for (let i: int = 0; i < curQueueSize; ++i) { 37 newArr[i] = this.array[this.curHeadIdx++]; 38 } 39 this.array = newArr; 40 this.curHeadIdx = 0; 41 this.curTailIdx = curQueueSize; 42 } 43 this.array[this.curTailIdx++] = el; 44 BlockingQueue.unblockWaiters(this.mutex, this.event); 45 } 46 47 // Removes and returns the first element in a BlockingQueue. 48 // If the BlockingQueue is empty then await_ method is called. 49 pop(): T { 50 BlockingQueue.lock(this.mutex); 51 while (this.curHeadIdx == this.curTailIdx) { 52 // Currently, the queue is empty. Need to await. 53 // This method is used for waiting until the BlockingQueue 54 // has at least one element. 55 // For more information please check this method's implementation. 56 BlockingQueue.await_(this.mutex, this.event); 57 } 58 try { 59 let curQueueSize: int = this.curTailIdx - this.curHeadIdx; 60 61 // If number of elements in a BlockingQueue is too small 62 // compared to the BlockingQueue capacity, it is better to reduce capacity. 63 if (curQueueSize * BlockingQueue.sizeReductionCheckCoef < this.array.length && 64 this.array.length >= BlockingQueue.minSizeToReduce) { 65 66 let newArr = new Array<T>((this.array.length / 2) as int); 67 for (let i: int = 0; i < curQueueSize; ++i) { 68 newArr[i] = this.array[this.curHeadIdx++]; 69 } 70 this.array = newArr; 71 this.curHeadIdx = 0; 72 this.curTailIdx = curQueueSize; 73 } 74 75 // If there is a lot of free space at the beginning of an array in a BlockingQueue, 76 // it is better to copy elements to the beginning - it saves space for new objects. 77 if (this.curHeadIdx * 2 > this.array.length) { 78 assert curQueueSize < this.array.length; 79 for (let i: int = 0; i < curQueueSize; ++i) { 80 this.array[i] = this.array[this.curHeadIdx++]; 81 } 82 this.curHeadIdx = 0; 83 this.curTailIdx = curQueueSize; 84 } 85 return this.array[this.curHeadIdx++]; 86 } finally { 87 BlockingQueue.unlock(this.mutex); 88 } 89 } 90 91 // Returns the first element in a BlockingQueue. 92 // If the BlockingQueue is empty then await_ method is called. 93 front(): T { 94 BlockingQueue.lock(this.mutex); 95 while (this.curHeadIdx == this.curTailIdx) { 96 // Currently, the queue is empty. Need to await. 97 // This method is used for waiting until the BlockingQueue 98 // has at least one element. 99 // For more information please check this method's implementation. 100 BlockingQueue.await_(this.mutex, this.event); 101 } 102 try { 103 return this.array[this.curHeadIdx]; 104 } finally { 105 BlockingQueue.unlock(this.mutex); 106 } 107 } 108 109 // Returns the last element in a BlockingQueue. 110 // If the BlockingQueue is empty then await_ method is called. 111 back(): T { 112 BlockingQueue.lock(this.mutex); 113 while (this.curHeadIdx == this.curTailIdx) { 114 // Currently, the queue is empty. Need to await. 115 // This method is used for waiting until the BlockingQueue 116 // has at least one element. 117 // For more information please check this method's implementation. 118 BlockingQueue.await_(this.mutex, this.event); 119 } 120 try { 121 return this.array[this.curTailIdx - 1]; 122 } finally { 123 BlockingQueue.unlock(this.mutex); 124 } 125 } 126 127 // Returns size of a BlockingQueue. 128 size(): int { 129 BlockingQueue.lock(this.mutex); 130 try { 131 return this.curTailIdx - this.curHeadIdx; 132 } finally { 133 BlockingQueue.unlock(this.mutex); 134 } 135 } 136 137 // Returns true if a BlockingQueue has no elements. 138 empty(): boolean { 139 return this.size() == 0; 140 } 141 142 // Frees internal memory, used by BlockingQueue instance. 143 // After calling this method, current BlockingQueue instance cannot be used anymore. 144 // This method MUST be called after finishing working with the BlockingQueue. 145 cleanup() { 146 BlockingQueue.deleteMutex(this.mutex); 147 BlockingQueue.deleteEvent(this.event); 148 } 149 150 private array: Array<T>; 151 private curHeadIdx: int = 0; // Index of the first element in a BlockingQueue 152 private curTailIdx: int = 0; // Index of the last element in a BlockingQueue 153 private mutex: long; // Mutex runtime pointer 154 private event: long; // Event runtime pointer 155 private static initSize: int = 8; // Initial size of a BlockingQueue 156 private static minSizeToReduce: int = 16; // Minimal size for capacity decrease (see pop()) 157 private static sizeReductionCheckCoef: int = 3; // Coeff used for BlockingQueue fullness estimation 158 159 private static native initMutex(): long; // Mutex creation 160 private static native initEvent(): long; // Event creation 161 private static native deleteMutex(mutex: long): void; // Mutex deletion 162 private static native deleteEvent(event: long): void; // Event deletion 163 private static native lock(mutex: long): void; // Mutex lock 164 private static native unlock(mutex: long): void; // Mutex unlock 165 private static native await_(mutex: long, event: long): void; // Called in case BlockingQueue is empty 166 private static native unblockWaiters(mutex: long, event: long): void; // Called to wake up all waiters 167} 168