• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2023 The ANGLE Project Authors. All rights reserved.
3 // Use of this source code is governed by a BSD-style license that can be
4 // found in the LICENSE file.
5 //
6 // CircularBuffer_unittest:
7 //   Tests of the CircularBuffer class
8 //
9 
10 #include <gtest/gtest.h>
11 
12 #include "common/FixedQueue.h"
13 
14 #include <chrono>
15 #include <thread>
16 
17 namespace angle
18 {
19 // Make sure the various constructors compile and do basic checks
TEST(FixedQueue,Constructors)20 TEST(FixedQueue, Constructors)
21 {
22     FixedQueue<int> q(5);
23     EXPECT_EQ(0u, q.size());
24     EXPECT_EQ(true, q.empty());
25 }
26 
27 // Make sure the destructor destroys all elements.
TEST(FixedQueue,Destructor)28 TEST(FixedQueue, Destructor)
29 {
30     struct s
31     {
32         s() : counter(nullptr) {}
33         s(int *c) : counter(c) {}
34         ~s()
35         {
36             if (counter)
37             {
38                 ++*counter;
39             }
40         }
41 
42         s(const s &)            = default;
43         s &operator=(const s &) = default;
44 
45         int *counter;
46     };
47 
48     int destructorCount = 0;
49 
50     {
51         FixedQueue<s> q(11);
52         q.push(s(&destructorCount));
53         // Destructor called once for the temporary above.
54         EXPECT_EQ(1, destructorCount);
55     }
56 
57     // Destructor should be called one more time for the element we pushed.
58     EXPECT_EQ(2, destructorCount);
59 }
60 
61 // Make sure the pop destroys the element.
TEST(FixedQueue,Pop)62 TEST(FixedQueue, Pop)
63 {
64     struct s
65     {
66         s() : counter(nullptr) {}
67         s(int *c) : counter(c) {}
68         ~s()
69         {
70             if (counter)
71             {
72                 ++*counter;
73             }
74         }
75 
76         s(const s &) = default;
77         s &operator=(const s &s)
78         {
79             // increment if we are overwriting the custom initialized object
80             if (counter)
81             {
82                 ++*counter;
83             }
84             counter = s.counter;
85             return *this;
86         }
87 
88         int *counter;
89     };
90 
91     int destructorCount = 0;
92 
93     FixedQueue<s> q(11);
94     q.push(s(&destructorCount));
95     // Destructor called once for the temporary above.
96     EXPECT_EQ(1, destructorCount);
97     q.pop();
98     // Copy assignment should be called for the element we popped.
99     EXPECT_EQ(2, destructorCount);
100 }
101 
102 // Test circulating behavior.
TEST(FixedQueue,WrapAround)103 TEST(FixedQueue, WrapAround)
104 {
105     FixedQueue<int> q(7);
106 
107     for (int i = 0; i < 7; ++i)
108     {
109         q.push(i);
110     }
111 
112     EXPECT_EQ(0, q.front());
113     q.pop();
114     // This should wrap around
115     q.push(7);
116     for (int i = 0; i < 7; ++i)
117     {
118         EXPECT_EQ(i + 1, q.front());
119         q.pop();
120     }
121 }
122 
123 // Test concurrent push and pop behavior.
TEST(FixedQueue,ConcurrentPushPop)124 TEST(FixedQueue, ConcurrentPushPop)
125 {
126     FixedQueue<uint64_t> q(7);
127     double timeOut    = 1.0;
128     uint64_t kMaxLoop = 1000000ull;
129     std::atomic<bool> enqueueThreadFinished;
130     enqueueThreadFinished = false;
131     std::atomic<bool> dequeueThreadFinished;
132     dequeueThreadFinished = false;
133 
134     std::thread enqueueThread = std::thread([&]() {
135         std::time_t t1 = std::time(nullptr);
136         uint64_t value = 0;
137         do
138         {
139             while (q.full() && !dequeueThreadFinished)
140             {
141                 std::this_thread::sleep_for(std::chrono::microseconds(1));
142             }
143             if (dequeueThreadFinished)
144             {
145                 break;
146             }
147             q.push(value);
148             value++;
149         } while (difftime(std::time(nullptr), t1) < timeOut && value < kMaxLoop);
150         ASSERT(difftime(std::time(nullptr), t1) >= timeOut || value >= kMaxLoop);
151         enqueueThreadFinished = true;
152     });
153 
154     std::thread dequeueThread = std::thread([&]() {
155         std::time_t t1         = std::time(nullptr);
156         uint64_t expectedValue = 0;
157         do
158         {
159             while (q.empty() && !enqueueThreadFinished)
160             {
161                 std::this_thread::sleep_for(std::chrono::microseconds(1));
162             }
163 
164             EXPECT_EQ(expectedValue, q.front());
165             // test pop
166             q.pop();
167 
168             expectedValue++;
169         } while (difftime(std::time(nullptr), t1) < timeOut && expectedValue < kMaxLoop);
170         ASSERT(difftime(std::time(nullptr), t1) >= timeOut || expectedValue >= kMaxLoop);
171         dequeueThreadFinished = true;
172     });
173 
174     enqueueThread.join();
175     dequeueThread.join();
176 }
177 
178 // Test concurrent push and pop behavior. When queue is full, instead of wait, it will try to
179 // increase capacity. At dequeue thread, it will also try to shrink the queue capacity when size
180 // fall under half of the capacity.
TEST(FixedQueue,ConcurrentPushPopWithResize)181 TEST(FixedQueue, ConcurrentPushPopWithResize)
182 {
183     static constexpr size_t kInitialQueueCapacity = 64;
184     static constexpr size_t kMaxQueueCapacity     = 64 * 1024;
185     FixedQueue<uint64_t> q(kInitialQueueCapacity);
186     double timeOut    = 1.0;
187     uint64_t kMaxLoop = 1000000ull;
188     std::atomic<bool> enqueueThreadFinished(false);
189     std::atomic<bool> dequeueThreadFinished(false);
190     std::mutex enqueueMutex;
191     std::mutex dequeueMutex;
192 
193     std::thread enqueueThread = std::thread([&]() {
194         std::time_t t1 = std::time(nullptr);
195         uint64_t value = 0;
196         do
197         {
198             std::unique_lock<std::mutex> enqueueLock(enqueueMutex);
199             if (q.full())
200             {
201                 // Take both lock to ensure no one will access while we try to double the
202                 // storage. Note that under a well balanced system, this should happen infrequently.
203                 std::unique_lock<std::mutex> dequeueLock(dequeueMutex);
204                 // Check again to see if queue is still full after taking the dequeueMutex.
205                 size_t newCapacity = q.capacity() * 2;
206                 if (q.full() && newCapacity < kMaxQueueCapacity)
207                 {
208                     // Double the storage size while we took the lock
209                     q.updateCapacity(newCapacity);
210                 }
211             }
212 
213             // If queue is still full, lets wait for dequeue thread to make some progress
214             while (q.full() && !dequeueThreadFinished)
215             {
216                 enqueueLock.unlock();
217                 std::this_thread::sleep_for(std::chrono::microseconds(1));
218                 enqueueLock.lock();
219             }
220 
221             if (dequeueThreadFinished)
222             {
223                 break;
224             }
225 
226             q.push(value);
227             value++;
228         } while (difftime(std::time(nullptr), t1) < timeOut && value < kMaxLoop &&
229                  !dequeueThreadFinished);
230         enqueueThreadFinished = true;
231     });
232 
233     std::thread dequeueThread = std::thread([&]() {
234         std::time_t t1         = std::time(nullptr);
235         uint64_t expectedValue = 0;
236         do
237         {
238             std::unique_lock<std::mutex> dequeueLock(dequeueMutex);
239             if (q.size() < q.capacity() / 10 && q.capacity() > kInitialQueueCapacity)
240             {
241                 // Shrink the storage if we only used less than 10% of storage. We must take both
242                 // lock to ensure no one is accessing it when we update storage. And the lock must
243                 // take in the same order as other thread to avoid deadlock.
244                 dequeueLock.unlock();
245                 std::unique_lock<std::mutex> enqueueLock(enqueueMutex);
246                 dequeueLock.lock();
247                 // Figure out what the new capacity should be
248                 size_t newCapacity = q.capacity() / 2;
249                 while (q.size() < newCapacity)
250                 {
251                     newCapacity /= 2;
252                 }
253                 newCapacity *= 2;
254                 newCapacity = std::max(newCapacity, kInitialQueueCapacity);
255 
256                 q.updateCapacity(newCapacity);
257             }
258 
259             while (q.empty() && !enqueueThreadFinished)
260             {
261                 dequeueLock.unlock();
262                 std::this_thread::sleep_for(std::chrono::microseconds(1));
263                 dequeueLock.lock();
264             }
265 
266             ASSERT(expectedValue == q.front());
267             // test pop
268             q.pop();
269             expectedValue++;
270         } while (difftime(std::time(nullptr), t1) < timeOut && expectedValue < kMaxLoop &&
271                  !enqueueThreadFinished);
272         dequeueThreadFinished = true;
273     });
274 
275     enqueueThread.join();
276     dequeueThread.join();
277 }
278 
279 // Test clearing the queue
TEST(FixedQueue,Clear)280 TEST(FixedQueue, Clear)
281 {
282     FixedQueue<int> q(5);
283     for (int i = 0; i < 5; ++i)
284     {
285         q.push(i);
286     }
287     q.clear();
288     EXPECT_EQ(0u, q.size());
289     EXPECT_EQ(true, q.empty());
290 }
291 }  // namespace angle
292