1/** 2 * Copyright (c) 2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16package std.containers; 17 18/** 19 * BlockingQueue implementation 20 */ 21export final class BlockingQueue<T> { 22 23 constructor() { 24 // Array, where all BlockingQueue elements will be located 25 this.array = new Array<T>(BlockingQueue.initSize); 26 this.mutex = ConcurrencyHelpers.mutexCreate(); 27 this.condVar = ConcurrencyHelpers.condVarCreate(); 28 } 29 30 // Inserts element at the end. 31 push(el: T) { 32 ConcurrencyHelpers.mutexLock(this.mutex); 33 if (this.curTailIdx == this.array.length) { 34 this.stretch(); 35 } 36 this.array[this.curTailIdx++] = el; 37 ConcurrencyHelpers.condVarNotifyOne(this.condVar, this.mutex); 38 ConcurrencyHelpers.mutexUnlock(this.mutex); 39 } 40 41 // Removes and returns the first element in a BlockingQueue. 42 // If the BlockingQueue is empty then wait method is called. 43 pop(): T { 44 ConcurrencyHelpers.mutexLock(this.mutex); 45 this.waitUntilQueueIsEmpty(); 46 try { 47 let curQueueSize: int = this.curTailIdx - this.curHeadIdx; 48 49 // If number of elements in a BlockingQueue is too small 50 // compared to the BlockingQueue capacity, it is better to reduce capacity. 51 if (curQueueSize * BlockingQueue.sizeReductionCheckCoef < this.array.length && 52 this.array.length >= BlockingQueue.minSizeToReduce) { 53 this.shrink(); 54 } 55 56 // If there is a lot of free space at the beginning of an array in a BlockingQueue, 57 // it is better to copy elements to the beginning - it saves space for new objects. 58 if (this.curHeadIdx * 2 > this.array.length) { 59 assert curQueueSize < this.array.length; 60 for (let i: int = 0; i < curQueueSize; ++i) { 61 this.array[i] = this.array[this.curHeadIdx++]; 62 } 63 this.curHeadIdx = 0; 64 this.curTailIdx = curQueueSize; 65 } 66 return this.array[this.curHeadIdx++]; 67 } finally { 68 ConcurrencyHelpers.mutexUnlock(this.mutex); 69 } 70 } 71 72 // Returns the first element in a BlockingQueue. 73 // If the BlockingQueue is empty then wait method is called. 74 front(): T { 75 ConcurrencyHelpers.mutexLock(this.mutex); 76 this.waitUntilQueueIsEmpty(); 77 try { 78 return this.array[this.curHeadIdx]; 79 } finally { 80 ConcurrencyHelpers.mutexUnlock(this.mutex); 81 } 82 } 83 84 // Returns the last element in a BlockingQueue. 85 // If the BlockingQueue is empty then wait method is called. 86 back(): T { 87 ConcurrencyHelpers.mutexLock(this.mutex); 88 this.waitUntilQueueIsEmpty(); 89 try { 90 return this.array[this.curTailIdx - 1]; 91 } finally { 92 ConcurrencyHelpers.mutexUnlock(this.mutex); 93 } 94 } 95 96 // Returns size of a BlockingQueue. 97 size(): int { 98 ConcurrencyHelpers.mutexLock(this.mutex); 99 try { 100 return this.curTailIdx - this.curHeadIdx; 101 } finally { 102 ConcurrencyHelpers.mutexUnlock(this.mutex); 103 } 104 } 105 106 // Returns true if a BlockingQueue has no elements. 107 empty(): boolean { 108 return this.size() == 0; 109 } 110 111 private shrink() { 112 this.resizeQueue((this.array.length / 2) as int); 113 } 114 115 private stretch() { 116 this.resizeQueue((this.array.length * 2) as int); 117 } 118 119 private resizeQueue(size: int) { 120 let curQueueSize: int = this.curTailIdx - this.curHeadIdx; 121 let newArr = new Array<T>(size); 122 for (let i: int = 0; i < curQueueSize; ++i) { 123 newArr[i] = this.array[this.curHeadIdx++]; 124 } 125 this.array = newArr; 126 this.curHeadIdx = 0; 127 this.curTailIdx = curQueueSize; 128 } 129 130 private waitUntilQueueIsEmpty() { 131 while (this.curHeadIdx == this.curTailIdx) { 132 // Currently, the queue is empty. Need to await. 133 // This method is used for waiting until the BlockingQueue 134 // has at least one element. 135 ConcurrencyHelpers.condVarWait(this.condVar, this.mutex); 136 } 137 } 138 139 private array: Array<T>; 140 private curHeadIdx: int = 0; // Index of the first element in a BlockingQueue 141 private curTailIdx: int = 0; // Index of the last element in a BlockingQueue 142 private mutex: Object; // push/pop Mutex 143 private condVar: Object; // push/pop CondVar 144 private static initSize: int = 8; // Initial size of a BlockingQueue 145 private static minSizeToReduce: int = 16; // Minimal size for capacity decrease (see pop()) 146 private static sizeReductionCheckCoef: int = 3; // Coeff used for BlockingQueue fullness estimation 147} 148