1 use std::sync::atomic::{AtomicUsize, Ordering};
2
3 use crossbeam_queue::ArrayQueue;
4 use crossbeam_utils::thread::scope;
5 use rand::{thread_rng, Rng};
6
7 #[test]
smoke()8 fn smoke() {
9 let q = ArrayQueue::new(1);
10
11 q.push(7).unwrap();
12 assert_eq!(q.pop(), Some(7));
13
14 q.push(8).unwrap();
15 assert_eq!(q.pop(), Some(8));
16 assert!(q.pop().is_none());
17 }
18
19 #[test]
capacity()20 fn capacity() {
21 for i in 1..10 {
22 let q = ArrayQueue::<i32>::new(i);
23 assert_eq!(q.capacity(), i);
24 }
25 }
26
27 #[test]
28 #[should_panic(expected = "capacity must be non-zero")]
zero_capacity()29 fn zero_capacity() {
30 let _ = ArrayQueue::<i32>::new(0);
31 }
32
33 #[test]
len_empty_full()34 fn len_empty_full() {
35 let q = ArrayQueue::new(2);
36
37 assert_eq!(q.len(), 0);
38 assert!(q.is_empty());
39 assert!(!q.is_full());
40
41 q.push(()).unwrap();
42
43 assert_eq!(q.len(), 1);
44 assert!(!q.is_empty());
45 assert!(!q.is_full());
46
47 q.push(()).unwrap();
48
49 assert_eq!(q.len(), 2);
50 assert!(!q.is_empty());
51 assert!(q.is_full());
52
53 q.pop().unwrap();
54
55 assert_eq!(q.len(), 1);
56 assert!(!q.is_empty());
57 assert!(!q.is_full());
58 }
59
60 #[cfg_attr(miri, ignore)] // Miri is too slow
61 #[test]
len()62 fn len() {
63 const COUNT: usize = 25_000;
64 const CAP: usize = 1000;
65
66 let q = ArrayQueue::new(CAP);
67 assert_eq!(q.len(), 0);
68
69 for _ in 0..CAP / 10 {
70 for i in 0..50 {
71 q.push(i).unwrap();
72 assert_eq!(q.len(), i + 1);
73 }
74
75 for i in 0..50 {
76 q.pop().unwrap();
77 assert_eq!(q.len(), 50 - i - 1);
78 }
79 }
80 assert_eq!(q.len(), 0);
81
82 for i in 0..CAP {
83 q.push(i).unwrap();
84 assert_eq!(q.len(), i + 1);
85 }
86
87 for _ in 0..CAP {
88 q.pop().unwrap();
89 }
90 assert_eq!(q.len(), 0);
91
92 scope(|scope| {
93 scope.spawn(|_| {
94 for i in 0..COUNT {
95 loop {
96 if let Some(x) = q.pop() {
97 assert_eq!(x, i);
98 break;
99 }
100 }
101 let len = q.len();
102 assert!(len <= CAP);
103 }
104 });
105
106 scope.spawn(|_| {
107 for i in 0..COUNT {
108 while q.push(i).is_err() {}
109 let len = q.len();
110 assert!(len <= CAP);
111 }
112 });
113 })
114 .unwrap();
115 assert_eq!(q.len(), 0);
116 }
117
118 #[cfg_attr(miri, ignore)] // Miri is too slow
119 #[test]
spsc()120 fn spsc() {
121 const COUNT: usize = 100_000;
122
123 let q = ArrayQueue::new(3);
124
125 scope(|scope| {
126 scope.spawn(|_| {
127 for i in 0..COUNT {
128 loop {
129 if let Some(x) = q.pop() {
130 assert_eq!(x, i);
131 break;
132 }
133 }
134 }
135 assert!(q.pop().is_none());
136 });
137
138 scope.spawn(|_| {
139 for i in 0..COUNT {
140 while q.push(i).is_err() {}
141 }
142 });
143 })
144 .unwrap();
145 }
146
147 #[cfg_attr(miri, ignore)] // Miri is too slow
148 #[test]
mpmc()149 fn mpmc() {
150 const COUNT: usize = 25_000;
151 const THREADS: usize = 4;
152
153 let q = ArrayQueue::<usize>::new(3);
154 let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
155
156 scope(|scope| {
157 for _ in 0..THREADS {
158 scope.spawn(|_| {
159 for _ in 0..COUNT {
160 let n = loop {
161 if let Some(x) = q.pop() {
162 break x;
163 }
164 };
165 v[n].fetch_add(1, Ordering::SeqCst);
166 }
167 });
168 }
169 for _ in 0..THREADS {
170 scope.spawn(|_| {
171 for i in 0..COUNT {
172 while q.push(i).is_err() {}
173 }
174 });
175 }
176 })
177 .unwrap();
178
179 for c in v {
180 assert_eq!(c.load(Ordering::SeqCst), THREADS);
181 }
182 }
183
184 #[cfg_attr(miri, ignore)] // Miri is too slow
185 #[test]
drops()186 fn drops() {
187 const RUNS: usize = 100;
188
189 static DROPS: AtomicUsize = AtomicUsize::new(0);
190
191 #[derive(Debug, PartialEq)]
192 struct DropCounter;
193
194 impl Drop for DropCounter {
195 fn drop(&mut self) {
196 DROPS.fetch_add(1, Ordering::SeqCst);
197 }
198 }
199
200 let mut rng = thread_rng();
201
202 for _ in 0..RUNS {
203 let steps = rng.gen_range(0..10_000);
204 let additional = rng.gen_range(0..50);
205
206 DROPS.store(0, Ordering::SeqCst);
207 let q = ArrayQueue::new(50);
208
209 scope(|scope| {
210 scope.spawn(|_| {
211 for _ in 0..steps {
212 while q.pop().is_none() {}
213 }
214 });
215
216 scope.spawn(|_| {
217 for _ in 0..steps {
218 while q.push(DropCounter).is_err() {
219 DROPS.fetch_sub(1, Ordering::SeqCst);
220 }
221 }
222 });
223 })
224 .unwrap();
225
226 for _ in 0..additional {
227 q.push(DropCounter).unwrap();
228 }
229
230 assert_eq!(DROPS.load(Ordering::SeqCst), steps);
231 drop(q);
232 assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional);
233 }
234 }
235
236 #[test]
linearizable()237 fn linearizable() {
238 #[cfg(miri)]
239 const COUNT: usize = 500;
240 #[cfg(not(miri))]
241 const COUNT: usize = 25_000;
242 const THREADS: usize = 4;
243
244 let q = ArrayQueue::new(THREADS);
245
246 scope(|scope| {
247 for _ in 0..THREADS {
248 scope.spawn(|_| {
249 for _ in 0..COUNT {
250 while q.push(0).is_err() {}
251 q.pop().unwrap();
252 }
253 });
254 }
255 })
256 .unwrap();
257 }
258
259 #[test]
into_iter()260 fn into_iter() {
261 let q = ArrayQueue::new(100);
262 for i in 0..100 {
263 q.push(i).unwrap();
264 }
265 for (i, j) in q.into_iter().enumerate() {
266 assert_eq!(i, j);
267 }
268 }
269