• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1/**
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16package std.containers;
17
18/**
19 * BlockingQueue implementation
20 */
21export final class BlockingQueue<T> {
22
23    constructor() {
24        // Array, where all BlockingQueue elements will be located
25        this.array = new Array<T>(BlockingQueue.initSize);
26        this.mutex = BlockingQueue.initMutex();
27        this.event = BlockingQueue.initEvent();
28    }
29
30    // Inserts element at the end.
31    push(el: T) {
32        BlockingQueue.lock(this.mutex);
33        if (this.curTailIdx == this.array.length) {
34            let curQueueSize: int = this.curTailIdx - this.curHeadIdx;
35            let newArr = new Array<T>((this.array.length * 2) as int);
36            for (let i: int = 0; i < curQueueSize; ++i) {
37                newArr[i] = this.array[this.curHeadIdx++];
38            }
39            this.array = newArr;
40            this.curHeadIdx = 0;
41            this.curTailIdx = curQueueSize;
42        }
43        this.array[this.curTailIdx++] = el;
44        BlockingQueue.unblockWaiters(this.mutex, this.event);
45    }
46
47    // Removes and returns the first element in a BlockingQueue.
48    // If the BlockingQueue is empty then await_ method is called.
49    pop(): T {
50        BlockingQueue.lock(this.mutex);
51        while (this.curHeadIdx == this.curTailIdx) {
52            // Currently, the queue is empty. Need to await.
53            // This method is used for waiting until the BlockingQueue
54            // has at least one element.
55            // For more information please check this method's implementation.
56            BlockingQueue.await_(this.mutex, this.event);
57        }
58        try {
59            let curQueueSize: int = this.curTailIdx - this.curHeadIdx;
60
61            // If number of elements in a BlockingQueue is too small
62            // compared to the BlockingQueue capacity, it is better to reduce capacity.
63            if (curQueueSize * BlockingQueue.sizeReductionCheckCoef < this.array.length &&
64                this.array.length >= BlockingQueue.minSizeToReduce) {
65
66                let newArr = new Array<T>((this.array.length / 2) as int);
67                for (let i: int = 0; i < curQueueSize; ++i) {
68                    newArr[i] = this.array[this.curHeadIdx++];
69                }
70                this.array = newArr;
71                this.curHeadIdx = 0;
72                this.curTailIdx = curQueueSize;
73            }
74
75            // If there is a lot of free space at the beginning of an array in a BlockingQueue,
76            // it is better to copy elements to the beginning - it saves space for new objects.
77            if (this.curHeadIdx * 2 > this.array.length) {
78                assert curQueueSize < this.array.length;
79                for (let i: int = 0; i < curQueueSize; ++i) {
80                    this.array[i] = this.array[this.curHeadIdx++];
81                }
82                this.curHeadIdx = 0;
83                this.curTailIdx = curQueueSize;
84            }
85            return this.array[this.curHeadIdx++];
86        } finally {
87            BlockingQueue.unlock(this.mutex);
88        }
89    }
90
91    // Returns the first element in a BlockingQueue.
92    // If the BlockingQueue is empty then await_ method is called.
93    front(): T {
94        BlockingQueue.lock(this.mutex);
95        while (this.curHeadIdx == this.curTailIdx) {
96            // Currently, the queue is empty. Need to await.
97            // This method is used for waiting until the BlockingQueue
98            // has at least one element.
99            // For more information please check this method's implementation.
100            BlockingQueue.await_(this.mutex, this.event);
101        }
102        try {
103            return this.array[this.curHeadIdx];
104        } finally {
105            BlockingQueue.unlock(this.mutex);
106        }
107    }
108
109    // Returns the last element in a BlockingQueue.
110    // If the BlockingQueue is empty then await_ method is called.
111    back(): T {
112        BlockingQueue.lock(this.mutex);
113        while (this.curHeadIdx == this.curTailIdx) {
114            // Currently, the queue is empty. Need to await.
115            // This method is used for waiting until the BlockingQueue
116            // has at least one element.
117            // For more information please check this method's implementation.
118            BlockingQueue.await_(this.mutex, this.event);
119        }
120        try {
121            return this.array[this.curTailIdx - 1];
122        } finally {
123            BlockingQueue.unlock(this.mutex);
124        }
125    }
126
127    // Returns size of a BlockingQueue.
128    size(): int {
129        BlockingQueue.lock(this.mutex);
130        try {
131            return this.curTailIdx - this.curHeadIdx;
132        } finally {
133            BlockingQueue.unlock(this.mutex);
134        }
135    }
136
137    // Returns true if a BlockingQueue has no elements.
138    empty(): boolean {
139        return this.size() == 0;
140    }
141
142    // Frees internal memory, used by BlockingQueue instance.
143    // After calling this method, current BlockingQueue instance cannot be used anymore.
144    // This method MUST be called after finishing working with the BlockingQueue.
145    cleanup() {
146        BlockingQueue.deleteMutex(this.mutex);
147        BlockingQueue.deleteEvent(this.event);
148    }
149
150    private array: Array<T>;
151    private curHeadIdx: int = 0;                    // Index of the first element in a BlockingQueue
152    private curTailIdx: int = 0;                    // Index of the last element in a BlockingQueue
153    private mutex: long;                            // Mutex runtime pointer
154    private event: long;                            // Event runtime pointer
155    private static initSize: int = 8;               // Initial size of a BlockingQueue
156    private static minSizeToReduce: int = 16;       // Minimal size for capacity decrease (see pop())
157    private static sizeReductionCheckCoef: int = 3; // Coeff used for BlockingQueue fullness estimation
158
159    private static native initMutex(): long;                              // Mutex creation
160    private static native initEvent(): long;                              // Event creation
161    private static native deleteMutex(mutex: long): void;                 // Mutex deletion
162    private static native deleteEvent(event: long): void;                 // Event deletion
163    private static native lock(mutex: long): void;                        // Mutex lock
164    private static native unlock(mutex: long): void;                      // Mutex unlock
165    private static native await_(mutex: long, event: long): void;         // Called in case BlockingQueue is empty
166    private static native unblockWaiters(mutex: long, event: long): void; // Called to wake up all waiters
167}
168