• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1/**
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16package std.containers;
17
18/**
19 * BlockingQueue implementation
20 */
21export final class BlockingQueue<T> {
22
23    constructor() {
24        // Array, where all BlockingQueue elements will be located
25        this.array = new Array<T>(BlockingQueue.initSize);
26        this.mutex = ConcurrencyHelpers.mutexCreate();
27        this.condVar = ConcurrencyHelpers.condVarCreate();
28    }
29
30    // Inserts element at the end.
31    push(el: T) {
32        ConcurrencyHelpers.mutexLock(this.mutex);
33        if (this.curTailIdx == this.array.length) {
34            this.stretch();
35        }
36        this.array[this.curTailIdx++] = el;
37        ConcurrencyHelpers.condVarNotifyOne(this.condVar, this.mutex);
38        ConcurrencyHelpers.mutexUnlock(this.mutex);
39    }
40
41    // Removes and returns the first element in a BlockingQueue.
42    // If the BlockingQueue is empty then wait method is called.
43    pop(): T {
44        ConcurrencyHelpers.mutexLock(this.mutex);
45        this.waitUntilQueueIsEmpty();
46        try {
47            let curQueueSize: int = this.curTailIdx - this.curHeadIdx;
48
49            // If number of elements in a BlockingQueue is too small
50            // compared to the BlockingQueue capacity, it is better to reduce capacity.
51            if (curQueueSize * BlockingQueue.sizeReductionCheckCoef < this.array.length &&
52                this.array.length >= BlockingQueue.minSizeToReduce) {
53                this.shrink();
54            }
55
56            // If there is a lot of free space at the beginning of an array in a BlockingQueue,
57            // it is better to copy elements to the beginning - it saves space for new objects.
58            if (this.curHeadIdx * 2 > this.array.length) {
59                assert curQueueSize < this.array.length;
60                for (let i: int = 0; i < curQueueSize; ++i) {
61                    this.array[i] = this.array[this.curHeadIdx++];
62                }
63                this.curHeadIdx = 0;
64                this.curTailIdx = curQueueSize;
65            }
66            return this.array[this.curHeadIdx++];
67        } finally {
68            ConcurrencyHelpers.mutexUnlock(this.mutex);
69        }
70    }
71
72    // Returns the first element in a BlockingQueue.
73    // If the BlockingQueue is empty then wait method is called.
74    front(): T {
75        ConcurrencyHelpers.mutexLock(this.mutex);
76        this.waitUntilQueueIsEmpty();
77        try {
78            return this.array[this.curHeadIdx];
79        } finally {
80            ConcurrencyHelpers.mutexUnlock(this.mutex);
81        }
82    }
83
84    // Returns the last element in a BlockingQueue.
85    // If the BlockingQueue is empty then wait method is called.
86    back(): T {
87        ConcurrencyHelpers.mutexLock(this.mutex);
88        this.waitUntilQueueIsEmpty();
89        try {
90            return this.array[this.curTailIdx - 1];
91        } finally {
92            ConcurrencyHelpers.mutexUnlock(this.mutex);
93        }
94    }
95
96    // Returns size of a BlockingQueue.
97    size(): int {
98        ConcurrencyHelpers.mutexLock(this.mutex);
99        try {
100            return this.curTailIdx - this.curHeadIdx;
101        } finally {
102            ConcurrencyHelpers.mutexUnlock(this.mutex);
103        }
104    }
105
106    // Returns true if a BlockingQueue has no elements.
107    empty(): boolean {
108        return this.size() == 0;
109    }
110
111    private shrink() {
112        this.resizeQueue((this.array.length / 2) as int);
113    }
114
115    private stretch() {
116        this.resizeQueue((this.array.length * 2) as int);
117    }
118
119    private resizeQueue(size: int) {
120        let curQueueSize: int = this.curTailIdx - this.curHeadIdx;
121        let newArr = new Array<T>(size);
122        for (let i: int = 0; i < curQueueSize; ++i) {
123            newArr[i] = this.array[this.curHeadIdx++];
124        }
125        this.array = newArr;
126        this.curHeadIdx = 0;
127        this.curTailIdx = curQueueSize;
128    }
129
130    private waitUntilQueueIsEmpty() {
131        while (this.curHeadIdx == this.curTailIdx) {
132            // Currently, the queue is empty. Need to await.
133            // This method is used for waiting until the BlockingQueue
134            // has at least one element.
135            ConcurrencyHelpers.condVarWait(this.condVar, this.mutex);
136        }
137    }
138
139    private array: Array<T>;
140    private curHeadIdx: int = 0;                    // Index of the first element in a BlockingQueue
141    private curTailIdx: int = 0;                    // Index of the last element in a BlockingQueue
142    private mutex: Object;                          // push/pop Mutex
143    private condVar: Object;                        // push/pop CondVar
144    private static initSize: int = 8;               // Initial size of a BlockingQueue
145    private static minSizeToReduce: int = 16;       // Minimal size for capacity decrease (see pop())
146    private static sizeReductionCheckCoef: int = 3; // Coeff used for BlockingQueue fullness estimation
147}
148