1 //
2 // Copyright 2023 The ANGLE Project Authors. All rights reserved.
3 // Use of this source code is governed by a BSD-style license that can be
4 // found in the LICENSE file.
5 //
6 // FixedQueue.h:
7 // An array based fifo queue class that supports concurrent push and pop.
8 //
9
10 #ifndef COMMON_FIXEDQUEUE_H_
11 #define COMMON_FIXEDQUEUE_H_
12
13 #include "common/debug.h"
14
15 #include <algorithm>
16 #include <array>
17 #include <atomic>
18
19 namespace angle
20 {
21 // class FixedQueue: An vector based fifo queue class that supports concurrent push and
22 // pop. Caller must ensure queue is not empty before pop and not full before push. This class
23 // supports concurrent push and pop from different threads, but only with single producer single
24 // consumer usage. If caller want to push from two different threads, proper mutex must be used to
25 // ensure the access is serialized. You can also call updateCapacity to adjust the storage size, but
26 // caller must take proper mutex lock to ensure no one is accessing the storage. In a typical usage
27 // case is that you have two mutex lock, enqueueLock and dequeueLock. You use enqueueLock to push
28 // and use dequeueLock to pop. You dont need the lock for checking size (i.e, call empty/full). You
29 // take both lock in a given order to call updateCapacity. See unit test
30 // FixedQueue.ConcurrentPushPopWithResize for example.
31 template <class T>
32 class FixedQueue final : angle::NonCopyable
33 {
34 public:
35 using Storage = std::vector<T>;
36 using value_type = typename Storage::value_type;
37 using size_type = typename Storage::size_type;
38 using reference = typename Storage::reference;
39 using const_reference = typename Storage::const_reference;
40
41 FixedQueue(size_t capacity);
42 ~FixedQueue();
43
44 size_type size() const;
45 bool empty() const;
46 bool full() const;
47
48 size_type capacity() const;
49 // Caller must ensure no one is accessing the data while update storage. This should happen
50 // infrequently since all data will be copied between old storage and new storage.
51 void updateCapacity(size_t newCapacity);
52
53 reference front();
54 const_reference front() const;
55
56 void push(const value_type &value);
57 void push(value_type &&value);
58
59 reference back();
60 const_reference back() const;
61
62 void pop();
63 void clear();
64
65 private:
66 Storage mData;
67 // The front and back indices are virtual indices (think about queue sizd is infinite). They
68 // will never wrap around when hit N. The wrap around occur when element is referenced. Virtual
69 // index for current head
70 size_type mFrontIndex;
71 // Virtual index for next write.
72 size_type mEndIndex;
73 // Atomic so that we can support concurrent push and pop.
74 std::atomic<size_type> mSize;
75 size_type mMaxSize;
76 };
77
78 template <class T>
FixedQueue(size_t capacity)79 FixedQueue<T>::FixedQueue(size_t capacity)
80 : mFrontIndex(0), mEndIndex(0), mSize(0), mMaxSize(capacity)
81 {
82 mData.resize(mMaxSize);
83 }
84
85 template <class T>
~FixedQueue()86 FixedQueue<T>::~FixedQueue()
87 {
88 mData.clear();
89 }
90
91 template <class T>
size()92 ANGLE_INLINE typename FixedQueue<T>::size_type FixedQueue<T>::size() const
93 {
94 return mSize;
95 }
96
97 template <class T>
empty()98 ANGLE_INLINE bool FixedQueue<T>::empty() const
99 {
100 return mSize == 0;
101 }
102
103 template <class T>
full()104 ANGLE_INLINE bool FixedQueue<T>::full() const
105 {
106 return mSize >= mMaxSize;
107 }
108
109 template <class T>
capacity()110 ANGLE_INLINE typename FixedQueue<T>::size_type FixedQueue<T>::capacity() const
111 {
112 return mMaxSize;
113 }
114
115 template <class T>
updateCapacity(size_t newCapacity)116 ANGLE_INLINE void FixedQueue<T>::updateCapacity(size_t newCapacity)
117 {
118 ASSERT(newCapacity >= mSize);
119 Storage newData(newCapacity);
120 for (size_type i = mFrontIndex; i < mEndIndex; i++)
121 {
122 newData[i % newCapacity] = std::move(mData[i % mMaxSize]);
123 }
124 mData.clear();
125 std::swap(newData, mData);
126 mMaxSize = newCapacity;
127 ASSERT(mData.size() == mMaxSize);
128 }
129
130 template <class T>
front()131 ANGLE_INLINE typename FixedQueue<T>::reference FixedQueue<T>::front()
132 {
133 ASSERT(mSize > 0);
134 return mData[mFrontIndex % mMaxSize];
135 }
136
137 template <class T>
front()138 ANGLE_INLINE typename FixedQueue<T>::const_reference FixedQueue<T>::front() const
139 {
140 ASSERT(mSize > 0);
141 return mData[mFrontIndex % mMaxSize];
142 }
143
144 template <class T>
push(const value_type & value)145 void FixedQueue<T>::push(const value_type &value)
146 {
147 ASSERT(mSize < mMaxSize);
148 mData[mEndIndex % mMaxSize] = value;
149 mEndIndex++;
150 // We must increment size last, after we wrote data. That way if another thread is doing
151 // `if(!dq.empty()){ s = dq.front(); }`, it will only see not empty until element is fully
152 // pushed.
153 mSize++;
154 }
155
156 template <class T>
push(value_type && value)157 void FixedQueue<T>::push(value_type &&value)
158 {
159 ASSERT(mSize < mMaxSize);
160 mData[mEndIndex % mMaxSize] = std::move(value);
161 mEndIndex++;
162 // We must increment size last, after we wrote data. That way if another thread is doing
163 // `if(!dq.empty()){ s = dq.front(); }`, it will only see not empty until element is fully
164 // pushed.
165 mSize++;
166 }
167
168 template <class T>
back()169 ANGLE_INLINE typename FixedQueue<T>::reference FixedQueue<T>::back()
170 {
171 ASSERT(mSize > 0);
172 return mData[(mEndIndex + (mMaxSize - 1)) % mMaxSize];
173 }
174
175 template <class T>
back()176 ANGLE_INLINE typename FixedQueue<T>::const_reference FixedQueue<T>::back() const
177 {
178 ASSERT(mSize > 0);
179 return mData[(mEndIndex + (mMaxSize - 1)) % mMaxSize];
180 }
181
182 template <class T>
pop()183 void FixedQueue<T>::pop()
184 {
185 ASSERT(mSize > 0);
186 mData[mFrontIndex % mMaxSize] = value_type();
187 mFrontIndex++;
188 // We must decrement size last, after we wrote data. That way if another thread is doing
189 // `if(!dq.full()){ dq.push; }`, it will only see not full until element is fully popped.
190 mSize--;
191 }
192
193 template <class T>
clear()194 void FixedQueue<T>::clear()
195 {
196 // Size will change in the "pop()" and also by "push()" calls from other thread.
197 const size_type localSize = mSize;
198 for (size_type i = 0; i < localSize; i++)
199 {
200 pop();
201 }
202 }
203 } // namespace angle
204
205 #endif // COMMON_FIXEDQUEUE_H_
206