1 //! Tests for the after channel flavor.
2
3 use std::sync::atomic::AtomicUsize;
4 use std::sync::atomic::Ordering;
5 use std::thread;
6 use std::time::{Duration, Instant};
7
8 use crossbeam_channel::{after, select, Select, TryRecvError};
9 use crossbeam_utils::thread::scope;
10
ms(ms: u64) -> Duration11 fn ms(ms: u64) -> Duration {
12 Duration::from_millis(ms)
13 }
14
15 #[test]
fire()16 fn fire() {
17 let start = Instant::now();
18 let r = after(ms(50));
19
20 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
21 thread::sleep(ms(100));
22
23 let fired = r.try_recv().unwrap();
24 assert!(start < fired);
25 assert!(fired - start >= ms(50));
26
27 let now = Instant::now();
28 assert!(fired < now);
29 assert!(now - fired >= ms(50));
30
31 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
32
33 select! {
34 recv(r) -> _ => panic!(),
35 default => {}
36 }
37
38 select! {
39 recv(r) -> _ => panic!(),
40 recv(after(ms(200))) -> _ => {}
41 }
42 }
43
44 #[test]
capacity()45 fn capacity() {
46 const COUNT: usize = 10;
47
48 for i in 0..COUNT {
49 let r = after(ms(i as u64));
50 assert_eq!(r.capacity(), Some(1));
51 }
52 }
53
54 #[test]
len_empty_full()55 fn len_empty_full() {
56 let r = after(ms(50));
57
58 assert_eq!(r.len(), 0);
59 assert_eq!(r.is_empty(), true);
60 assert_eq!(r.is_full(), false);
61
62 thread::sleep(ms(100));
63
64 assert_eq!(r.len(), 1);
65 assert_eq!(r.is_empty(), false);
66 assert_eq!(r.is_full(), true);
67
68 r.try_recv().unwrap();
69
70 assert_eq!(r.len(), 0);
71 assert_eq!(r.is_empty(), true);
72 assert_eq!(r.is_full(), false);
73 }
74
75 #[test]
try_recv()76 fn try_recv() {
77 let r = after(ms(200));
78 assert!(r.try_recv().is_err());
79
80 thread::sleep(ms(100));
81 assert!(r.try_recv().is_err());
82
83 thread::sleep(ms(200));
84 assert!(r.try_recv().is_ok());
85 assert!(r.try_recv().is_err());
86
87 thread::sleep(ms(200));
88 assert!(r.try_recv().is_err());
89 }
90
91 #[test]
recv()92 fn recv() {
93 let start = Instant::now();
94 let r = after(ms(50));
95
96 let fired = r.recv().unwrap();
97 assert!(start < fired);
98 assert!(fired - start >= ms(50));
99
100 let now = Instant::now();
101 assert!(fired < now);
102 assert!(now - fired < fired - start);
103
104 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
105 }
106
107 #[test]
recv_timeout()108 fn recv_timeout() {
109 let start = Instant::now();
110 let r = after(ms(200));
111
112 assert!(r.recv_timeout(ms(100)).is_err());
113 let now = Instant::now();
114 assert!(now - start >= ms(100));
115 assert!(now - start <= ms(150));
116
117 let fired = r.recv_timeout(ms(200)).unwrap();
118 assert!(fired - start >= ms(200));
119 assert!(fired - start <= ms(250));
120
121 assert!(r.recv_timeout(ms(200)).is_err());
122 let now = Instant::now();
123 assert!(now - start >= ms(400));
124 assert!(now - start <= ms(450));
125
126 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
127 }
128
129 #[test]
recv_two()130 fn recv_two() {
131 let r1 = after(ms(50));
132 let r2 = after(ms(50));
133
134 scope(|scope| {
135 scope.spawn(|_| {
136 select! {
137 recv(r1) -> _ => {}
138 recv(r2) -> _ => {}
139 }
140 });
141 scope.spawn(|_| {
142 select! {
143 recv(r1) -> _ => {}
144 recv(r2) -> _ => {}
145 }
146 });
147 })
148 .unwrap();
149 }
150
151 #[test]
recv_race()152 fn recv_race() {
153 select! {
154 recv(after(ms(50))) -> _ => {}
155 recv(after(ms(100))) -> _ => panic!(),
156 }
157
158 select! {
159 recv(after(ms(100))) -> _ => panic!(),
160 recv(after(ms(50))) -> _ => {}
161 }
162 }
163
164 #[test]
stress_default()165 fn stress_default() {
166 const COUNT: usize = 10;
167
168 for _ in 0..COUNT {
169 select! {
170 recv(after(ms(0))) -> _ => {}
171 default => panic!(),
172 }
173 }
174
175 for _ in 0..COUNT {
176 select! {
177 recv(after(ms(100))) -> _ => panic!(),
178 default => {}
179 }
180 }
181 }
182
183 #[test]
select()184 fn select() {
185 const THREADS: usize = 4;
186 const COUNT: usize = 1000;
187 const TIMEOUT_MS: u64 = 100;
188
189 let v = (0..COUNT)
190 .map(|i| after(ms(i as u64 / TIMEOUT_MS / 2)))
191 .collect::<Vec<_>>();
192 let hits = AtomicUsize::new(0);
193
194 scope(|scope| {
195 for _ in 0..THREADS {
196 scope.spawn(|_| {
197 let v: Vec<&_> = v.iter().collect();
198
199 loop {
200 let timeout = after(ms(TIMEOUT_MS));
201 let mut sel = Select::new();
202 for r in &v {
203 sel.recv(r);
204 }
205 let oper_timeout = sel.recv(&timeout);
206
207 let oper = sel.select();
208 match oper.index() {
209 i if i == oper_timeout => {
210 oper.recv(&timeout).unwrap();
211 break;
212 }
213 i => {
214 oper.recv(&v[i]).unwrap();
215 hits.fetch_add(1, Ordering::SeqCst);
216 }
217 }
218 }
219 });
220 }
221 })
222 .unwrap();
223
224 assert_eq!(hits.load(Ordering::SeqCst), COUNT);
225 }
226
227 #[test]
ready()228 fn ready() {
229 const THREADS: usize = 4;
230 const COUNT: usize = 1000;
231 const TIMEOUT_MS: u64 = 100;
232
233 let v = (0..COUNT)
234 .map(|i| after(ms(i as u64 / TIMEOUT_MS / 2)))
235 .collect::<Vec<_>>();
236 let hits = AtomicUsize::new(0);
237
238 scope(|scope| {
239 for _ in 0..THREADS {
240 scope.spawn(|_| {
241 let v: Vec<&_> = v.iter().collect();
242
243 loop {
244 let timeout = after(ms(TIMEOUT_MS));
245 let mut sel = Select::new();
246 for r in &v {
247 sel.recv(r);
248 }
249 let oper_timeout = sel.recv(&timeout);
250
251 loop {
252 let i = sel.ready();
253 if i == oper_timeout {
254 timeout.try_recv().unwrap();
255 return;
256 } else if v[i].try_recv().is_ok() {
257 hits.fetch_add(1, Ordering::SeqCst);
258 break;
259 }
260 }
261 }
262 });
263 }
264 })
265 .unwrap();
266
267 assert_eq!(hits.load(Ordering::SeqCst), COUNT);
268 }
269
270 #[test]
stress_clone()271 fn stress_clone() {
272 const RUNS: usize = 1000;
273 const THREADS: usize = 10;
274 const COUNT: usize = 50;
275
276 for i in 0..RUNS {
277 let r = after(ms(i as u64));
278
279 scope(|scope| {
280 for _ in 0..THREADS {
281 scope.spawn(|_| {
282 let r = r.clone();
283 let _ = r.try_recv();
284
285 for _ in 0..COUNT {
286 drop(r.clone());
287 thread::yield_now();
288 }
289 });
290 }
291 })
292 .unwrap();
293 }
294 }
295
296 #[test]
fairness()297 fn fairness() {
298 const COUNT: usize = 1000;
299
300 for &dur in &[0, 1] {
301 let mut hits = [0usize; 2];
302
303 for _ in 0..COUNT {
304 select! {
305 recv(after(ms(dur))) -> _ => hits[0] += 1,
306 recv(after(ms(dur))) -> _ => hits[1] += 1,
307 }
308 }
309
310 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
311 }
312 }
313
314 #[test]
fairness_duplicates()315 fn fairness_duplicates() {
316 const COUNT: usize = 1000;
317
318 for &dur in &[0, 1] {
319 let mut hits = [0usize; 5];
320
321 for _ in 0..COUNT {
322 let r = after(ms(dur));
323 select! {
324 recv(r) -> _ => hits[0] += 1,
325 recv(r) -> _ => hits[1] += 1,
326 recv(r) -> _ => hits[2] += 1,
327 recv(r) -> _ => hits[3] += 1,
328 recv(r) -> _ => hits[4] += 1,
329 }
330 }
331
332 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
333 }
334 }
335