1 //
2 // Copyright 2023 The ANGLE Project Authors. All rights reserved.
3 // Use of this source code is governed by a BSD-style license that can be
4 // found in the LICENSE file.
5 //
6 // CircularBuffer_unittest:
7 // Tests of the CircularBuffer class
8 //
9
10 #include <gtest/gtest.h>
11
12 #include "common/FixedQueue.h"
13
14 #include <chrono>
15 #include <thread>
16
17 namespace angle
18 {
19 // Make sure the various constructors compile and do basic checks
TEST(FixedQueue,Constructors)20 TEST(FixedQueue, Constructors)
21 {
22 FixedQueue<int> q(5);
23 EXPECT_EQ(0u, q.size());
24 EXPECT_EQ(true, q.empty());
25 }
26
27 // Make sure the destructor destroys all elements.
TEST(FixedQueue,Destructor)28 TEST(FixedQueue, Destructor)
29 {
30 struct s
31 {
32 s() : counter(nullptr) {}
33 s(int *c) : counter(c) {}
34 ~s()
35 {
36 if (counter)
37 {
38 ++*counter;
39 }
40 }
41
42 s(const s &) = default;
43 s &operator=(const s &) = default;
44
45 int *counter;
46 };
47
48 int destructorCount = 0;
49
50 {
51 FixedQueue<s> q(11);
52 q.push(s(&destructorCount));
53 // Destructor called once for the temporary above.
54 EXPECT_EQ(1, destructorCount);
55 }
56
57 // Destructor should be called one more time for the element we pushed.
58 EXPECT_EQ(2, destructorCount);
59 }
60
61 // Make sure the pop destroys the element.
TEST(FixedQueue,Pop)62 TEST(FixedQueue, Pop)
63 {
64 struct s
65 {
66 s() : counter(nullptr) {}
67 s(int *c) : counter(c) {}
68 ~s()
69 {
70 if (counter)
71 {
72 ++*counter;
73 }
74 }
75
76 s(const s &) = default;
77 s &operator=(const s &s)
78 {
79 // increment if we are overwriting the custom initialized object
80 if (counter)
81 {
82 ++*counter;
83 }
84 counter = s.counter;
85 return *this;
86 }
87
88 int *counter;
89 };
90
91 int destructorCount = 0;
92
93 FixedQueue<s> q(11);
94 q.push(s(&destructorCount));
95 // Destructor called once for the temporary above.
96 EXPECT_EQ(1, destructorCount);
97 q.pop();
98 // Copy assignment should be called for the element we popped.
99 EXPECT_EQ(2, destructorCount);
100 }
101
102 // Test circulating behavior.
TEST(FixedQueue,WrapAround)103 TEST(FixedQueue, WrapAround)
104 {
105 FixedQueue<int> q(7);
106
107 for (int i = 0; i < 7; ++i)
108 {
109 q.push(i);
110 }
111
112 EXPECT_EQ(0, q.front());
113 q.pop();
114 // This should wrap around
115 q.push(7);
116 for (int i = 0; i < 7; ++i)
117 {
118 EXPECT_EQ(i + 1, q.front());
119 q.pop();
120 }
121 }
122
123 // Test concurrent push and pop behavior.
TEST(FixedQueue,ConcurrentPushPop)124 TEST(FixedQueue, ConcurrentPushPop)
125 {
126 FixedQueue<uint64_t> q(7);
127 double timeOut = 1.0;
128 uint64_t kMaxLoop = 1000000ull;
129 std::atomic<bool> enqueueThreadFinished;
130 enqueueThreadFinished = false;
131 std::atomic<bool> dequeueThreadFinished;
132 dequeueThreadFinished = false;
133
134 std::thread enqueueThread = std::thread([&]() {
135 std::time_t t1 = std::time(nullptr);
136 uint64_t value = 0;
137 do
138 {
139 while (q.full() && !dequeueThreadFinished)
140 {
141 std::this_thread::sleep_for(std::chrono::microseconds(1));
142 }
143 if (dequeueThreadFinished)
144 {
145 break;
146 }
147 q.push(value);
148 value++;
149 } while (difftime(std::time(nullptr), t1) < timeOut && value < kMaxLoop);
150 ASSERT(difftime(std::time(nullptr), t1) >= timeOut || value >= kMaxLoop);
151 enqueueThreadFinished = true;
152 });
153
154 std::thread dequeueThread = std::thread([&]() {
155 std::time_t t1 = std::time(nullptr);
156 uint64_t expectedValue = 0;
157 do
158 {
159 while (q.empty() && !enqueueThreadFinished)
160 {
161 std::this_thread::sleep_for(std::chrono::microseconds(1));
162 }
163
164 EXPECT_EQ(expectedValue, q.front());
165 // test pop
166 q.pop();
167
168 expectedValue++;
169 } while (difftime(std::time(nullptr), t1) < timeOut && expectedValue < kMaxLoop);
170 ASSERT(difftime(std::time(nullptr), t1) >= timeOut || expectedValue >= kMaxLoop);
171 dequeueThreadFinished = true;
172 });
173
174 enqueueThread.join();
175 dequeueThread.join();
176 }
177
178 // Test concurrent push and pop behavior. When queue is full, instead of wait, it will try to
179 // increase capacity. At dequeue thread, it will also try to shrink the queue capacity when size
180 // fall under half of the capacity.
TEST(FixedQueue,ConcurrentPushPopWithResize)181 TEST(FixedQueue, ConcurrentPushPopWithResize)
182 {
183 static constexpr size_t kInitialQueueCapacity = 64;
184 static constexpr size_t kMaxQueueCapacity = 64 * 1024;
185 FixedQueue<uint64_t> q(kInitialQueueCapacity);
186 double timeOut = 1.0;
187 uint64_t kMaxLoop = 1000000ull;
188 std::atomic<bool> enqueueThreadFinished(false);
189 std::atomic<bool> dequeueThreadFinished(false);
190 std::mutex enqueueMutex;
191 std::mutex dequeueMutex;
192
193 std::thread enqueueThread = std::thread([&]() {
194 std::time_t t1 = std::time(nullptr);
195 uint64_t value = 0;
196 do
197 {
198 std::unique_lock<std::mutex> enqueueLock(enqueueMutex);
199 if (q.full())
200 {
201 // Take both lock to ensure no one will access while we try to double the
202 // storage. Note that under a well balanced system, this should happen infrequently.
203 std::unique_lock<std::mutex> dequeueLock(dequeueMutex);
204 // Check again to see if queue is still full after taking the dequeueMutex.
205 size_t newCapacity = q.capacity() * 2;
206 if (q.full() && newCapacity < kMaxQueueCapacity)
207 {
208 // Double the storage size while we took the lock
209 q.updateCapacity(newCapacity);
210 }
211 }
212
213 // If queue is still full, lets wait for dequeue thread to make some progress
214 while (q.full() && !dequeueThreadFinished)
215 {
216 enqueueLock.unlock();
217 std::this_thread::sleep_for(std::chrono::microseconds(1));
218 enqueueLock.lock();
219 }
220
221 if (dequeueThreadFinished)
222 {
223 break;
224 }
225
226 q.push(value);
227 value++;
228 } while (difftime(std::time(nullptr), t1) < timeOut && value < kMaxLoop &&
229 !dequeueThreadFinished);
230 enqueueThreadFinished = true;
231 });
232
233 std::thread dequeueThread = std::thread([&]() {
234 std::time_t t1 = std::time(nullptr);
235 uint64_t expectedValue = 0;
236 do
237 {
238 std::unique_lock<std::mutex> dequeueLock(dequeueMutex);
239 if (q.size() < q.capacity() / 10 && q.capacity() > kInitialQueueCapacity)
240 {
241 // Shrink the storage if we only used less than 10% of storage. We must take both
242 // lock to ensure no one is accessing it when we update storage. And the lock must
243 // take in the same order as other thread to avoid deadlock.
244 dequeueLock.unlock();
245 std::unique_lock<std::mutex> enqueueLock(enqueueMutex);
246 dequeueLock.lock();
247 // Figure out what the new capacity should be
248 size_t newCapacity = q.capacity() / 2;
249 while (q.size() < newCapacity)
250 {
251 newCapacity /= 2;
252 }
253 newCapacity *= 2;
254 newCapacity = std::max(newCapacity, kInitialQueueCapacity);
255
256 q.updateCapacity(newCapacity);
257 }
258
259 while (q.empty() && !enqueueThreadFinished)
260 {
261 dequeueLock.unlock();
262 std::this_thread::sleep_for(std::chrono::microseconds(1));
263 dequeueLock.lock();
264 }
265
266 ASSERT(expectedValue == q.front());
267 // test pop
268 q.pop();
269 expectedValue++;
270 } while (difftime(std::time(nullptr), t1) < timeOut && expectedValue < kMaxLoop &&
271 !enqueueThreadFinished);
272 dequeueThreadFinished = true;
273 });
274
275 enqueueThread.join();
276 dequeueThread.join();
277 }
278
279 // Test clearing the queue
TEST(FixedQueue,Clear)280 TEST(FixedQueue, Clear)
281 {
282 FixedQueue<int> q(5);
283 for (int i = 0; i < 5; ++i)
284 {
285 q.push(i);
286 }
287 q.clear();
288 EXPECT_EQ(0u, q.size());
289 EXPECT_EQ(true, q.empty());
290 }
291 } // namespace angle
292